~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/fs/dlm/rcom.c

Version: ~ [ linux-6.11.5 ] ~ [ linux-6.10.14 ] ~ [ linux-6.9.12 ] ~ [ linux-6.8.12 ] ~ [ linux-6.7.12 ] ~ [ linux-6.6.58 ] ~ [ linux-6.5.13 ] ~ [ linux-6.4.16 ] ~ [ linux-6.3.13 ] ~ [ linux-6.2.16 ] ~ [ linux-6.1.114 ] ~ [ linux-6.0.19 ] ~ [ linux-5.19.17 ] ~ [ linux-5.18.19 ] ~ [ linux-5.17.15 ] ~ [ linux-5.16.20 ] ~ [ linux-5.15.169 ] ~ [ linux-5.14.21 ] ~ [ linux-5.13.19 ] ~ [ linux-5.12.19 ] ~ [ linux-5.11.22 ] ~ [ linux-5.10.228 ] ~ [ linux-5.9.16 ] ~ [ linux-5.8.18 ] ~ [ linux-5.7.19 ] ~ [ linux-5.6.19 ] ~ [ linux-5.5.19 ] ~ [ linux-5.4.284 ] ~ [ linux-5.3.18 ] ~ [ linux-5.2.21 ] ~ [ linux-5.1.21 ] ~ [ linux-5.0.21 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.322 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.336 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.337 ] ~ [ linux-4.4.302 ] ~ [ linux-3.10.108 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.9 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 // SPDX-License-Identifier: GPL-2.0-only
  2 /******************************************************************************
  3 *******************************************************************************
  4 **
  5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  6 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
  7 **
  8 **
  9 *******************************************************************************
 10 ******************************************************************************/
 11 
 12 #include "dlm_internal.h"
 13 #include "lockspace.h"
 14 #include "member.h"
 15 #include "lowcomms.h"
 16 #include "midcomms.h"
 17 #include "rcom.h"
 18 #include "recover.h"
 19 #include "dir.h"
 20 #include "config.h"
 21 #include "memory.h"
 22 #include "lock.h"
 23 #include "util.h"
 24 
 25 static int rcom_response(struct dlm_ls *ls)
 26 {
 27         return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
 28 }
 29 
 30 static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
 31                          struct dlm_rcom **rc_ret, char *mb, int mb_len,
 32                          uint64_t seq)
 33 {
 34         struct dlm_rcom *rc;
 35 
 36         rc = (struct dlm_rcom *) mb;
 37 
 38         rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
 39         rc->rc_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
 40         rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
 41         rc->rc_header.h_length = cpu_to_le16(mb_len);
 42         rc->rc_header.h_cmd = DLM_RCOM;
 43 
 44         rc->rc_type = cpu_to_le32(type);
 45         rc->rc_seq = cpu_to_le64(seq);
 46 
 47         *rc_ret = rc;
 48 }
 49 
 50 static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
 51                        struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret,
 52                        uint64_t seq)
 53 {
 54         int mb_len = sizeof(struct dlm_rcom) + len;
 55         struct dlm_mhandle *mh;
 56         char *mb;
 57 
 58         mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
 59         if (!mh) {
 60                 log_print("%s to %d type %d len %d ENOBUFS",
 61                           __func__, to_nodeid, type, len);
 62                 return -ENOBUFS;
 63         }
 64 
 65         _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq);
 66         *mh_ret = mh;
 67         return 0;
 68 }
 69 
 70 static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
 71                                  int len, struct dlm_rcom **rc_ret,
 72                                  struct dlm_msg **msg_ret, uint64_t seq)
 73 {
 74         int mb_len = sizeof(struct dlm_rcom) + len;
 75         struct dlm_msg *msg;
 76         char *mb;
 77 
 78         msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, &mb, NULL, NULL);
 79         if (!msg) {
 80                 log_print("create_rcom to %d type %d len %d ENOBUFS",
 81                           to_nodeid, type, len);
 82                 return -ENOBUFS;
 83         }
 84 
 85         _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq);
 86         *msg_ret = msg;
 87         return 0;
 88 }
 89 
 90 static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc)
 91 {
 92         dlm_midcomms_commit_mhandle(mh, NULL, 0);
 93 }
 94 
 95 static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc)
 96 {
 97         dlm_lowcomms_commit_msg(msg);
 98         dlm_lowcomms_put_msg(msg);
 99 }
100 
101 static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
102                             uint32_t flags)
103 {
104         rs->rs_flags = cpu_to_le32(flags);
105 }
106 
107 /* When replying to a status request, a node also sends back its
108    configuration values.  The requesting node then checks that the remote
109    node is configured the same way as itself. */
110 
111 static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
112                             uint32_t num_slots)
113 {
114         rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
115         rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
116 
117         rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
118         rf->rf_num_slots = cpu_to_le16(num_slots);
119         rf->rf_generation =  cpu_to_le32(ls->ls_generation);
120 }
121 
122 static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
123 {
124         struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
125 
126         if ((le32_to_cpu(rc->rc_header.h_version) & 0xFFFF0000) != DLM_HEADER_MAJOR) {
127                 log_error(ls, "version mismatch: %x nodeid %d: %x",
128                           DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
129                           le32_to_cpu(rc->rc_header.h_version));
130                 return -EPROTO;
131         }
132 
133         if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
134             le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
135                 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
136                           ls->ls_lvblen, ls->ls_exflags, nodeid,
137                           le32_to_cpu(rf->rf_lvblen),
138                           le32_to_cpu(rf->rf_lsflags));
139                 return -EPROTO;
140         }
141         return 0;
142 }
143 
144 static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq)
145 {
146         spin_lock_bh(&ls->ls_rcom_spin);
147         *new_seq = cpu_to_le64(++ls->ls_rcom_seq);
148         set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
149         spin_unlock_bh(&ls->ls_rcom_spin);
150 }
151 
152 static void disallow_sync_reply(struct dlm_ls *ls)
153 {
154         spin_lock_bh(&ls->ls_rcom_spin);
155         clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
156         clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
157         spin_unlock_bh(&ls->ls_rcom_spin);
158 }
159 
160 /*
161  * low nodeid gathers one slot value at a time from each node.
162  * it sets need_slots=0, and saves rf_our_slot returned from each
163  * rcom_config.
164  *
165  * other nodes gather all slot values at once from the low nodeid.
166  * they set need_slots=1, and ignore the rf_our_slot returned from each
167  * rcom_config.  they use the rf_num_slots returned from the low
168  * node's rcom_config.
169  */
170 
171 int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags,
172                     uint64_t seq)
173 {
174         struct dlm_rcom *rc;
175         struct dlm_msg *msg;
176         int error = 0;
177 
178         ls->ls_recover_nodeid = nodeid;
179 
180         if (nodeid == dlm_our_nodeid()) {
181                 rc = ls->ls_recover_buf;
182                 rc->rc_result = cpu_to_le32(dlm_recover_status(ls));
183                 goto out;
184         }
185 
186 retry:
187         error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS,
188                                       sizeof(struct rcom_status), &rc, &msg,
189                                       seq);
190         if (error)
191                 goto out;
192 
193         set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
194 
195         allow_sync_reply(ls, &rc->rc_id);
196         memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
197 
198         send_rcom_stateless(msg, rc);
199 
200         error = dlm_wait_function(ls, &rcom_response);
201         disallow_sync_reply(ls);
202         if (error == -ETIMEDOUT)
203                 goto retry;
204         if (error)
205                 goto out;
206 
207         rc = ls->ls_recover_buf;
208 
209         if (rc->rc_result == cpu_to_le32(-ESRCH)) {
210                 /* we pretend the remote lockspace exists with 0 status */
211                 log_debug(ls, "remote node %d not ready", nodeid);
212                 rc->rc_result = 0;
213                 error = 0;
214         } else {
215                 error = check_rcom_config(ls, rc, nodeid);
216         }
217 
218         /* the caller looks at rc_result for the remote recovery status */
219  out:
220         return error;
221 }
222 
223 static void receive_rcom_status(struct dlm_ls *ls,
224                                 const struct dlm_rcom *rc_in,
225                                 uint64_t seq)
226 {
227         struct dlm_rcom *rc;
228         struct rcom_status *rs;
229         uint32_t status;
230         int nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
231         int len = sizeof(struct rcom_config);
232         struct dlm_msg *msg;
233         int num_slots = 0;
234         int error;
235 
236         if (!dlm_slots_version(&rc_in->rc_header)) {
237                 status = dlm_recover_status(ls);
238                 goto do_create;
239         }
240 
241         rs = (struct rcom_status *)rc_in->rc_buf;
242 
243         if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
244                 status = dlm_recover_status(ls);
245                 goto do_create;
246         }
247 
248         spin_lock_bh(&ls->ls_recover_lock);
249         status = ls->ls_recover_status;
250         num_slots = ls->ls_num_slots;
251         spin_unlock_bh(&ls->ls_recover_lock);
252         len += num_slots * sizeof(struct rcom_slot);
253 
254  do_create:
255         error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY,
256                                       len, &rc, &msg, seq);
257         if (error)
258                 return;
259 
260         rc->rc_id = rc_in->rc_id;
261         rc->rc_seq_reply = rc_in->rc_seq;
262         rc->rc_result = cpu_to_le32(status);
263 
264         set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
265 
266         if (!num_slots)
267                 goto do_send;
268 
269         spin_lock_bh(&ls->ls_recover_lock);
270         if (ls->ls_num_slots != num_slots) {
271                 spin_unlock_bh(&ls->ls_recover_lock);
272                 log_debug(ls, "receive_rcom_status num_slots %d to %d",
273                           num_slots, ls->ls_num_slots);
274                 rc->rc_result = 0;
275                 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
276                 goto do_send;
277         }
278 
279         dlm_slots_copy_out(ls, rc);
280         spin_unlock_bh(&ls->ls_recover_lock);
281 
282  do_send:
283         send_rcom_stateless(msg, rc);
284 }
285 
286 static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
287 {
288         spin_lock_bh(&ls->ls_rcom_spin);
289         if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
290             le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) {
291                 log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
292                           le32_to_cpu(rc_in->rc_type),
293                           le32_to_cpu(rc_in->rc_header.h_nodeid),
294                           (unsigned long long)le64_to_cpu(rc_in->rc_id),
295                           (unsigned long long)ls->ls_rcom_seq);
296                 goto out;
297         }
298         memcpy(ls->ls_recover_buf, rc_in,
299                le16_to_cpu(rc_in->rc_header.h_length));
300         set_bit(LSFL_RCOM_READY, &ls->ls_flags);
301         clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
302         wake_up(&ls->ls_wait_general);
303  out:
304         spin_unlock_bh(&ls->ls_rcom_spin);
305 }
306 
307 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
308                    int last_len, uint64_t seq)
309 {
310         struct dlm_mhandle *mh;
311         struct dlm_rcom *rc;
312         int error = 0;
313 
314         ls->ls_recover_nodeid = nodeid;
315 
316 retry:
317         error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len,
318                             &rc, &mh, seq);
319         if (error)
320                 goto out;
321         memcpy(rc->rc_buf, last_name, last_len);
322 
323         allow_sync_reply(ls, &rc->rc_id);
324         memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
325 
326         send_rcom(mh, rc);
327 
328         error = dlm_wait_function(ls, &rcom_response);
329         disallow_sync_reply(ls);
330         if (error == -ETIMEDOUT)
331                 goto retry;
332  out:
333         return error;
334 }
335 
336 static void receive_rcom_names(struct dlm_ls *ls, const struct dlm_rcom *rc_in,
337                                uint64_t seq)
338 {
339         struct dlm_mhandle *mh;
340         struct dlm_rcom *rc;
341         int error, inlen, outlen, nodeid;
342 
343         nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
344         inlen = le16_to_cpu(rc_in->rc_header.h_length) -
345                 sizeof(struct dlm_rcom);
346         outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom);
347 
348         error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
349                             &rc, &mh, seq);
350         if (error)
351                 return;
352         rc->rc_id = rc_in->rc_id;
353         rc->rc_seq_reply = rc_in->rc_seq;
354 
355         dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
356                               nodeid);
357         send_rcom(mh, rc);
358 }
359 
360 int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq)
361 {
362         struct dlm_rcom *rc;
363         struct dlm_mhandle *mh;
364         struct dlm_ls *ls = r->res_ls;
365         int error;
366 
367         error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
368                             &rc, &mh, seq);
369         if (error)
370                 goto out;
371         memcpy(rc->rc_buf, r->res_name, r->res_length);
372         rc->rc_id = cpu_to_le64(r->res_id);
373 
374         send_rcom(mh, rc);
375  out:
376         return error;
377 }
378 
379 static void receive_rcom_lookup(struct dlm_ls *ls,
380                                 const struct dlm_rcom *rc_in, uint64_t seq)
381 {
382         struct dlm_rcom *rc;
383         struct dlm_mhandle *mh;
384         int error, ret_nodeid, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
385         int len = le16_to_cpu(rc_in->rc_header.h_length) -
386                 sizeof(struct dlm_rcom);
387 
388         /* Old code would send this special id to trigger a debug dump. */
389         if (rc_in->rc_id == cpu_to_le64(0xFFFFFFFF)) {
390                 log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
391                 dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
392                 return;
393         }
394 
395         error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh,
396                             seq);
397         if (error)
398                 return;
399 
400         error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
401                                   DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
402         if (error)
403                 ret_nodeid = error;
404         rc->rc_result = cpu_to_le32(ret_nodeid);
405         rc->rc_id = rc_in->rc_id;
406         rc->rc_seq_reply = rc_in->rc_seq;
407 
408         send_rcom(mh, rc);
409 }
410 
411 static void receive_rcom_lookup_reply(struct dlm_ls *ls,
412                                       const struct dlm_rcom *rc_in)
413 {
414         dlm_recover_master_reply(ls, rc_in);
415 }
416 
417 static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
418                            struct rcom_lock *rl)
419 {
420         memset(rl, 0, sizeof(*rl));
421 
422         rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
423         rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
424         rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
425         rl->rl_flags = cpu_to_le32(dlm_dflags_val(lkb));
426         rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
427         rl->rl_rqmode = lkb->lkb_rqmode;
428         rl->rl_grmode = lkb->lkb_grmode;
429         rl->rl_status = lkb->lkb_status;
430         rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
431 
432         if (lkb->lkb_bastfn)
433                 rl->rl_asts |= DLM_CB_BAST;
434         if (lkb->lkb_astfn)
435                 rl->rl_asts |= DLM_CB_CAST;
436 
437         rl->rl_namelen = cpu_to_le16(r->res_length);
438         memcpy(rl->rl_name, r->res_name, r->res_length);
439 
440         /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
441            If so, receive_rcom_lock_args() won't take this copy. */
442 
443         if (lkb->lkb_lvbptr)
444                 memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
445 }
446 
447 int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq)
448 {
449         struct dlm_ls *ls = r->res_ls;
450         struct dlm_rcom *rc;
451         struct dlm_mhandle *mh;
452         struct rcom_lock *rl;
453         int error, len = sizeof(struct rcom_lock);
454 
455         if (lkb->lkb_lvbptr)
456                 len += ls->ls_lvblen;
457 
458         error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh,
459                             seq);
460         if (error)
461                 goto out;
462 
463         rl = (struct rcom_lock *) rc->rc_buf;
464         pack_rcom_lock(r, lkb, rl);
465         rc->rc_id = cpu_to_le64((uintptr_t)r);
466 
467         send_rcom(mh, rc);
468  out:
469         return error;
470 }
471 
472 /* needs at least dlm_rcom + rcom_lock */
473 static void receive_rcom_lock(struct dlm_ls *ls, const struct dlm_rcom *rc_in,
474                               uint64_t seq)
475 {
476         __le32 rl_remid, rl_result;
477         struct rcom_lock *rl;
478         struct dlm_rcom *rc;
479         struct dlm_mhandle *mh;
480         int error, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
481 
482         dlm_recover_master_copy(ls, rc_in, &rl_remid, &rl_result);
483 
484         error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
485                             sizeof(struct rcom_lock), &rc, &mh, seq);
486         if (error)
487                 return;
488 
489         memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
490         rl = (struct rcom_lock *)rc->rc_buf;
491         /* set rl_remid and rl_result from dlm_recover_master_copy() */
492         rl->rl_remid = rl_remid;
493         rl->rl_result = rl_result;
494 
495         rc->rc_id = rc_in->rc_id;
496         rc->rc_seq_reply = rc_in->rc_seq;
497 
498         send_rcom(mh, rc);
499 }
500 
501 /* If the lockspace doesn't exist then still send a status message
502    back; it's possible that it just doesn't have its global_id yet. */
503 
504 int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in)
505 {
506         struct dlm_rcom *rc;
507         struct rcom_config *rf;
508         struct dlm_mhandle *mh;
509         char *mb;
510         int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
511 
512         mh = dlm_midcomms_get_mhandle(nodeid, mb_len, &mb);
513         if (!mh)
514                 return -ENOBUFS;
515 
516         rc = (struct dlm_rcom *) mb;
517 
518         rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
519         rc->rc_header.u.h_lockspace = rc_in->rc_header.u.h_lockspace;
520         rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
521         rc->rc_header.h_length = cpu_to_le16(mb_len);
522         rc->rc_header.h_cmd = DLM_RCOM;
523 
524         rc->rc_type = cpu_to_le32(DLM_RCOM_STATUS_REPLY);
525         rc->rc_id = rc_in->rc_id;
526         rc->rc_seq_reply = rc_in->rc_seq;
527         rc->rc_result = cpu_to_le32(-ESRCH);
528 
529         rf = (struct rcom_config *) rc->rc_buf;
530         rf->rf_lvblen = cpu_to_le32(~0U);
531 
532         dlm_midcomms_commit_mhandle(mh, NULL, 0);
533 
534         return 0;
535 }
536 
537 /*
538  * Ignore messages for stage Y before we set
539  * recover_status bit for stage X:
540  *
541  * recover_status = 0
542  *
543  * dlm_recover_members()
544  * - send nothing
545  * - recv nothing
546  * - ignore NAMES, NAMES_REPLY
547  * - ignore LOOKUP, LOOKUP_REPLY
548  * - ignore LOCK, LOCK_REPLY
549  *
550  * recover_status |= NODES
551  *
552  * dlm_recover_members_wait()
553  *
554  * dlm_recover_directory()
555  * - send NAMES
556  * - recv NAMES_REPLY
557  * - ignore LOOKUP, LOOKUP_REPLY
558  * - ignore LOCK, LOCK_REPLY
559  *
560  * recover_status |= DIR
561  *
562  * dlm_recover_directory_wait()
563  *
564  * dlm_recover_masters()
565  * - send LOOKUP
566  * - recv LOOKUP_REPLY
567  *
568  * dlm_recover_locks()
569  * - send LOCKS
570  * - recv LOCKS_REPLY
571  *
572  * recover_status |= LOCKS
573  *
574  * dlm_recover_locks_wait()
575  *
576  * recover_status |= DONE
577  */
578 
579 /* Called by dlm_recv; corresponds to dlm_receive_message() but special
580    recovery-only comms are sent through here. */
581 
582 void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid)
583 {
584         int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
585         int stop, reply = 0, names = 0, lookup = 0, lock = 0;
586         uint32_t status;
587         uint64_t seq;
588 
589         switch (rc->rc_type) {
590         case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
591                 reply = 1;
592                 break;
593         case cpu_to_le32(DLM_RCOM_NAMES):
594                 names = 1;
595                 break;
596         case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
597                 names = 1;
598                 reply = 1;
599                 break;
600         case cpu_to_le32(DLM_RCOM_LOOKUP):
601                 lookup = 1;
602                 break;
603         case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY):
604                 lookup = 1;
605                 reply = 1;
606                 break;
607         case cpu_to_le32(DLM_RCOM_LOCK):
608                 lock = 1;
609                 break;
610         case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
611                 lock = 1;
612                 reply = 1;
613                 break;
614         }
615 
616         spin_lock_bh(&ls->ls_recover_lock);
617         status = ls->ls_recover_status;
618         stop = dlm_recovery_stopped(ls);
619         seq = ls->ls_recover_seq;
620         spin_unlock_bh(&ls->ls_recover_lock);
621 
622         if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS)))
623                 goto ignore;
624 
625         if (reply && (le64_to_cpu(rc->rc_seq_reply) != seq))
626                 goto ignore;
627 
628         if (!(status & DLM_RS_NODES) && (names || lookup || lock))
629                 goto ignore;
630 
631         if (!(status & DLM_RS_DIR) && (lookup || lock))
632                 goto ignore;
633 
634         switch (rc->rc_type) {
635         case cpu_to_le32(DLM_RCOM_STATUS):
636                 receive_rcom_status(ls, rc, seq);
637                 break;
638 
639         case cpu_to_le32(DLM_RCOM_NAMES):
640                 receive_rcom_names(ls, rc, seq);
641                 break;
642 
643         case cpu_to_le32(DLM_RCOM_LOOKUP):
644                 receive_rcom_lookup(ls, rc, seq);
645                 break;
646 
647         case cpu_to_le32(DLM_RCOM_LOCK):
648                 if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
649                         goto Eshort;
650                 receive_rcom_lock(ls, rc, seq);
651                 break;
652 
653         case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
654                 receive_sync_reply(ls, rc);
655                 break;
656 
657         case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
658                 receive_sync_reply(ls, rc);
659                 break;
660 
661         case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY):
662                 receive_rcom_lookup_reply(ls, rc);
663                 break;
664 
665         case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
666                 if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
667                         goto Eshort;
668                 dlm_recover_process_copy(ls, rc, seq);
669                 break;
670 
671         default:
672                 log_error(ls, "receive_rcom bad type %d",
673                           le32_to_cpu(rc->rc_type));
674         }
675         return;
676 
677 ignore:
678         log_limit(ls, "dlm_receive_rcom ignore msg %d "
679                   "from %d %llu %llu recover seq %llu sts %x gen %u",
680                    le32_to_cpu(rc->rc_type),
681                    nodeid,
682                    (unsigned long long)le64_to_cpu(rc->rc_seq),
683                    (unsigned long long)le64_to_cpu(rc->rc_seq_reply),
684                    (unsigned long long)seq,
685                    status, ls->ls_generation);
686         return;
687 Eshort:
688         log_error(ls, "recovery message %d from %d is too short",
689                   le32_to_cpu(rc->rc_type), nodeid);
690 }
691 
692 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

sflogo.php