~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

TOMOYO Linux Cross Reference
Linux/net/ceph/mon_client.c

Version: ~ [ linux-6.11.5 ] ~ [ linux-6.10.14 ] ~ [ linux-6.9.12 ] ~ [ linux-6.8.12 ] ~ [ linux-6.7.12 ] ~ [ linux-6.6.58 ] ~ [ linux-6.5.13 ] ~ [ linux-6.4.16 ] ~ [ linux-6.3.13 ] ~ [ linux-6.2.16 ] ~ [ linux-6.1.114 ] ~ [ linux-6.0.19 ] ~ [ linux-5.19.17 ] ~ [ linux-5.18.19 ] ~ [ linux-5.17.15 ] ~ [ linux-5.16.20 ] ~ [ linux-5.15.169 ] ~ [ linux-5.14.21 ] ~ [ linux-5.13.19 ] ~ [ linux-5.12.19 ] ~ [ linux-5.11.22 ] ~ [ linux-5.10.228 ] ~ [ linux-5.9.16 ] ~ [ linux-5.8.18 ] ~ [ linux-5.7.19 ] ~ [ linux-5.6.19 ] ~ [ linux-5.5.19 ] ~ [ linux-5.4.284 ] ~ [ linux-5.3.18 ] ~ [ linux-5.2.21 ] ~ [ linux-5.1.21 ] ~ [ linux-5.0.21 ] ~ [ linux-4.20.17 ] ~ [ linux-4.19.322 ] ~ [ linux-4.18.20 ] ~ [ linux-4.17.19 ] ~ [ linux-4.16.18 ] ~ [ linux-4.15.18 ] ~ [ linux-4.14.336 ] ~ [ linux-4.13.16 ] ~ [ linux-4.12.14 ] ~ [ linux-4.11.12 ] ~ [ linux-4.10.17 ] ~ [ linux-4.9.337 ] ~ [ linux-4.4.302 ] ~ [ linux-3.10.108 ] ~ [ linux-2.6.32.71 ] ~ [ linux-2.6.0 ] ~ [ linux-2.4.37.11 ] ~ [ unix-v6-master ] ~ [ ccs-tools-1.8.9 ] ~ [ policy-sample ] ~
Architecture: ~ [ i386 ] ~ [ alpha ] ~ [ m68k ] ~ [ mips ] ~ [ ppc ] ~ [ sparc ] ~ [ sparc64 ] ~

  1 // SPDX-License-Identifier: GPL-2.0
  2 #include <linux/ceph/ceph_debug.h>
  3 
  4 #include <linux/module.h>
  5 #include <linux/types.h>
  6 #include <linux/slab.h>
  7 #include <linux/random.h>
  8 #include <linux/sched.h>
  9 
 10 #include <linux/ceph/ceph_features.h>
 11 #include <linux/ceph/mon_client.h>
 12 #include <linux/ceph/libceph.h>
 13 #include <linux/ceph/debugfs.h>
 14 #include <linux/ceph/decode.h>
 15 #include <linux/ceph/auth.h>
 16 
 17 /*
 18  * Interact with Ceph monitor cluster.  Handle requests for new map
 19  * versions, and periodically resend as needed.  Also implement
 20  * statfs() and umount().
 21  *
 22  * A small cluster of Ceph "monitors" are responsible for managing critical
 23  * cluster configuration and state information.  An odd number (e.g., 3, 5)
 24  * of cmon daemons use a modified version of the Paxos part-time parliament
 25  * algorithm to manage the MDS map (mds cluster membership), OSD map, and
 26  * list of clients who have mounted the file system.
 27  *
 28  * We maintain an open, active session with a monitor at all times in order to
 29  * receive timely MDSMap updates.  We periodically send a keepalive byte on the
 30  * TCP socket to ensure we detect a failure.  If the connection does break, we
 31  * randomly hunt for a new monitor.  Once the connection is reestablished, we
 32  * resend any outstanding requests.
 33  */
 34 
 35 static const struct ceph_connection_operations mon_con_ops;
 36 
 37 static int __validate_auth(struct ceph_mon_client *monc);
 38 
 39 static int decode_mon_info(void **p, void *end, bool msgr2,
 40                            struct ceph_entity_addr *addr)
 41 {
 42         void *mon_info_end;
 43         u32 struct_len;
 44         u8 struct_v;
 45         int ret;
 46 
 47         ret = ceph_start_decoding(p, end, 1, "mon_info_t", &struct_v,
 48                                   &struct_len);
 49         if (ret)
 50                 return ret;
 51 
 52         mon_info_end = *p + struct_len;
 53         ceph_decode_skip_string(p, end, e_inval);  /* skip mon name */
 54         ret = ceph_decode_entity_addrvec(p, end, msgr2, addr);
 55         if (ret)
 56                 return ret;
 57 
 58         *p = mon_info_end;
 59         return 0;
 60 
 61 e_inval:
 62         return -EINVAL;
 63 }
 64 
 65 /*
 66  * Decode a monmap blob (e.g., during mount).
 67  *
 68  * Assume MonMap v3 (i.e. encoding with MONNAMES and MONENC).
 69  */
 70 static struct ceph_monmap *ceph_monmap_decode(void **p, void *end, bool msgr2)
 71 {
 72         struct ceph_monmap *monmap = NULL;
 73         struct ceph_fsid fsid;
 74         u32 struct_len;
 75         int blob_len;
 76         int num_mon;
 77         u8 struct_v;
 78         u32 epoch;
 79         int ret;
 80         int i;
 81 
 82         ceph_decode_32_safe(p, end, blob_len, e_inval);
 83         ceph_decode_need(p, end, blob_len, e_inval);
 84 
 85         ret = ceph_start_decoding(p, end, 6, "monmap", &struct_v, &struct_len);
 86         if (ret)
 87                 goto fail;
 88 
 89         dout("%s struct_v %d\n", __func__, struct_v);
 90         ceph_decode_copy_safe(p, end, &fsid, sizeof(fsid), e_inval);
 91         ceph_decode_32_safe(p, end, epoch, e_inval);
 92         if (struct_v >= 6) {
 93                 u32 feat_struct_len;
 94                 u8 feat_struct_v;
 95 
 96                 *p += sizeof(struct ceph_timespec);  /* skip last_changed */
 97                 *p += sizeof(struct ceph_timespec);  /* skip created */
 98 
 99                 ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
100                                           &feat_struct_v, &feat_struct_len);
101                 if (ret)
102                         goto fail;
103 
104                 *p += feat_struct_len;  /* skip persistent_features */
105 
106                 ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
107                                           &feat_struct_v, &feat_struct_len);
108                 if (ret)
109                         goto fail;
110 
111                 *p += feat_struct_len;  /* skip optional_features */
112         }
113         ceph_decode_32_safe(p, end, num_mon, e_inval);
114 
115         dout("%s fsid %pU epoch %u num_mon %d\n", __func__, &fsid, epoch,
116              num_mon);
117         if (num_mon > CEPH_MAX_MON)
118                 goto e_inval;
119 
120         monmap = kmalloc(struct_size(monmap, mon_inst, num_mon), GFP_NOIO);
121         if (!monmap) {
122                 ret = -ENOMEM;
123                 goto fail;
124         }
125         monmap->fsid = fsid;
126         monmap->epoch = epoch;
127         monmap->num_mon = num_mon;
128 
129         /* legacy_mon_addr map or mon_info map */
130         for (i = 0; i < num_mon; i++) {
131                 struct ceph_entity_inst *inst = &monmap->mon_inst[i];
132 
133                 ceph_decode_skip_string(p, end, e_inval);  /* skip mon name */
134                 inst->name.type = CEPH_ENTITY_TYPE_MON;
135                 inst->name.num = cpu_to_le64(i);
136 
137                 if (struct_v >= 6)
138                         ret = decode_mon_info(p, end, msgr2, &inst->addr);
139                 else
140                         ret = ceph_decode_entity_addr(p, end, &inst->addr);
141                 if (ret)
142                         goto fail;
143 
144                 dout("%s mon%d addr %s\n", __func__, i,
145                      ceph_pr_addr(&inst->addr));
146         }
147 
148         return monmap;
149 
150 e_inval:
151         ret = -EINVAL;
152 fail:
153         kfree(monmap);
154         return ERR_PTR(ret);
155 }
156 
157 /*
158  * return true if *addr is included in the monmap.
159  */
160 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
161 {
162         int i;
163 
164         for (i = 0; i < m->num_mon; i++) {
165                 if (ceph_addr_equal_no_type(addr, &m->mon_inst[i].addr))
166                         return 1;
167         }
168 
169         return 0;
170 }
171 
172 /*
173  * Send an auth request.
174  */
175 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
176 {
177         monc->pending_auth = 1;
178         monc->m_auth->front.iov_len = len;
179         monc->m_auth->hdr.front_len = cpu_to_le32(len);
180         ceph_msg_revoke(monc->m_auth);
181         ceph_msg_get(monc->m_auth);  /* keep our ref */
182         ceph_con_send(&monc->con, monc->m_auth);
183 }
184 
185 /*
186  * Close monitor session, if any.
187  */
188 static void __close_session(struct ceph_mon_client *monc)
189 {
190         dout("__close_session closing mon%d\n", monc->cur_mon);
191         ceph_msg_revoke(monc->m_auth);
192         ceph_msg_revoke_incoming(monc->m_auth_reply);
193         ceph_msg_revoke(monc->m_subscribe);
194         ceph_msg_revoke_incoming(monc->m_subscribe_ack);
195         ceph_con_close(&monc->con);
196 
197         monc->pending_auth = 0;
198         ceph_auth_reset(monc->auth);
199 }
200 
201 /*
202  * Pick a new monitor at random and set cur_mon.  If we are repicking
203  * (i.e. cur_mon is already set), be sure to pick a different one.
204  */
205 static void pick_new_mon(struct ceph_mon_client *monc)
206 {
207         int old_mon = monc->cur_mon;
208 
209         BUG_ON(monc->monmap->num_mon < 1);
210 
211         if (monc->monmap->num_mon == 1) {
212                 monc->cur_mon = 0;
213         } else {
214                 int max = monc->monmap->num_mon;
215                 int o = -1;
216                 int n;
217 
218                 if (monc->cur_mon >= 0) {
219                         if (monc->cur_mon < monc->monmap->num_mon)
220                                 o = monc->cur_mon;
221                         if (o >= 0)
222                                 max--;
223                 }
224 
225                 n = get_random_u32_below(max);
226                 if (o >= 0 && n >= o)
227                         n++;
228 
229                 monc->cur_mon = n;
230         }
231 
232         dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
233              monc->cur_mon, monc->monmap->num_mon);
234 }
235 
236 /*
237  * Open a session with a new monitor.
238  */
239 static void __open_session(struct ceph_mon_client *monc)
240 {
241         int ret;
242 
243         pick_new_mon(monc);
244 
245         monc->hunting = true;
246         if (monc->had_a_connection) {
247                 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
248                 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
249                         monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
250         }
251 
252         monc->sub_renew_after = jiffies; /* i.e., expired */
253         monc->sub_renew_sent = 0;
254 
255         dout("%s opening mon%d\n", __func__, monc->cur_mon);
256         ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
257                       &monc->monmap->mon_inst[monc->cur_mon].addr);
258 
259         /*
260          * Queue a keepalive to ensure that in case of an early fault
261          * the messenger doesn't put us into STANDBY state and instead
262          * retries.  This also ensures that our timestamp is valid by
263          * the time we finish hunting and delayed_work() checks it.
264          */
265         ceph_con_keepalive(&monc->con);
266         if (ceph_msgr2(monc->client)) {
267                 monc->pending_auth = 1;
268                 return;
269         }
270 
271         /* initiate authentication handshake */
272         ret = ceph_auth_build_hello(monc->auth,
273                                     monc->m_auth->front.iov_base,
274                                     monc->m_auth->front_alloc_len);
275         BUG_ON(ret <= 0);
276         __send_prepared_auth_request(monc, ret);
277 }
278 
279 static void reopen_session(struct ceph_mon_client *monc)
280 {
281         if (!monc->hunting)
282                 pr_info("mon%d %s session lost, hunting for new mon\n",
283                     monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr));
284 
285         __close_session(monc);
286         __open_session(monc);
287 }
288 
289 void ceph_monc_reopen_session(struct ceph_mon_client *monc)
290 {
291         mutex_lock(&monc->mutex);
292         reopen_session(monc);
293         mutex_unlock(&monc->mutex);
294 }
295 
296 static void un_backoff(struct ceph_mon_client *monc)
297 {
298         monc->hunt_mult /= 2; /* reduce by 50% */
299         if (monc->hunt_mult < 1)
300                 monc->hunt_mult = 1;
301         dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult);
302 }
303 
304 /*
305  * Reschedule delayed work timer.
306  */
307 static void __schedule_delayed(struct ceph_mon_client *monc)
308 {
309         unsigned long delay;
310 
311         if (monc->hunting)
312                 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
313         else
314                 delay = CEPH_MONC_PING_INTERVAL;
315 
316         dout("__schedule_delayed after %lu\n", delay);
317         mod_delayed_work(system_wq, &monc->delayed_work,
318                          round_jiffies_relative(delay));
319 }
320 
321 const char *ceph_sub_str[] = {
322         [CEPH_SUB_MONMAP] = "monmap",
323         [CEPH_SUB_OSDMAP] = "osdmap",
324         [CEPH_SUB_FSMAP]  = "fsmap.user",
325         [CEPH_SUB_MDSMAP] = "mdsmap",
326 };
327 
328 /*
329  * Send subscribe request for one or more maps, according to
330  * monc->subs.
331  */
332 static void __send_subscribe(struct ceph_mon_client *monc)
333 {
334         struct ceph_msg *msg = monc->m_subscribe;
335         void *p = msg->front.iov_base;
336         void *const end = p + msg->front_alloc_len;
337         int num = 0;
338         int i;
339 
340         dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
341 
342         BUG_ON(monc->cur_mon < 0);
343 
344         if (!monc->sub_renew_sent)
345                 monc->sub_renew_sent = jiffies | 1; /* never 0 */
346 
347         msg->hdr.version = cpu_to_le16(2);
348 
349         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
350                 if (monc->subs[i].want)
351                         num++;
352         }
353         BUG_ON(num < 1); /* monmap sub is always there */
354         ceph_encode_32(&p, num);
355         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
356                 char buf[32];
357                 int len;
358 
359                 if (!monc->subs[i].want)
360                         continue;
361 
362                 len = sprintf(buf, "%s", ceph_sub_str[i]);
363                 if (i == CEPH_SUB_MDSMAP &&
364                     monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
365                         len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
366 
367                 dout("%s %s start %llu flags 0x%x\n", __func__, buf,
368                      le64_to_cpu(monc->subs[i].item.start),
369                      monc->subs[i].item.flags);
370                 ceph_encode_string(&p, end, buf, len);
371                 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
372                 p += sizeof(monc->subs[i].item);
373         }
374 
375         BUG_ON(p > end);
376         msg->front.iov_len = p - msg->front.iov_base;
377         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
378         ceph_msg_revoke(msg);
379         ceph_con_send(&monc->con, ceph_msg_get(msg));
380 }
381 
382 static void handle_subscribe_ack(struct ceph_mon_client *monc,
383                                  struct ceph_msg *msg)
384 {
385         unsigned int seconds;
386         struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
387 
388         if (msg->front.iov_len < sizeof(*h))
389                 goto bad;
390         seconds = le32_to_cpu(h->duration);
391 
392         mutex_lock(&monc->mutex);
393         if (monc->sub_renew_sent) {
394                 /*
395                  * This is only needed for legacy (infernalis or older)
396                  * MONs -- see delayed_work().
397                  */
398                 monc->sub_renew_after = monc->sub_renew_sent +
399                                             (seconds >> 1) * HZ - 1;
400                 dout("%s sent %lu duration %d renew after %lu\n", __func__,
401                      monc->sub_renew_sent, seconds, monc->sub_renew_after);
402                 monc->sub_renew_sent = 0;
403         } else {
404                 dout("%s sent %lu renew after %lu, ignoring\n", __func__,
405                      monc->sub_renew_sent, monc->sub_renew_after);
406         }
407         mutex_unlock(&monc->mutex);
408         return;
409 bad:
410         pr_err("got corrupt subscribe-ack msg\n");
411         ceph_msg_dump(msg);
412 }
413 
414 /*
415  * Register interest in a map
416  *
417  * @sub: one of CEPH_SUB_*
418  * @epoch: X for "every map since X", or 0 for "just the latest"
419  */
420 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
421                                  u32 epoch, bool continuous)
422 {
423         __le64 start = cpu_to_le64(epoch);
424         u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
425 
426         dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
427              epoch, continuous);
428 
429         if (monc->subs[sub].want &&
430             monc->subs[sub].item.start == start &&
431             monc->subs[sub].item.flags == flags)
432                 return false;
433 
434         monc->subs[sub].item.start = start;
435         monc->subs[sub].item.flags = flags;
436         monc->subs[sub].want = true;
437 
438         return true;
439 }
440 
441 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
442                         bool continuous)
443 {
444         bool need_request;
445 
446         mutex_lock(&monc->mutex);
447         need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
448         mutex_unlock(&monc->mutex);
449 
450         return need_request;
451 }
452 EXPORT_SYMBOL(ceph_monc_want_map);
453 
454 /*
455  * Keep track of which maps we have
456  *
457  * @sub: one of CEPH_SUB_*
458  */
459 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
460                                 u32 epoch)
461 {
462         dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
463 
464         if (monc->subs[sub].want) {
465                 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
466                         monc->subs[sub].want = false;
467                 else
468                         monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
469         }
470 
471         monc->subs[sub].have = epoch;
472 }
473 
474 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
475 {
476         mutex_lock(&monc->mutex);
477         __ceph_monc_got_map(monc, sub, epoch);
478         mutex_unlock(&monc->mutex);
479 }
480 EXPORT_SYMBOL(ceph_monc_got_map);
481 
482 void ceph_monc_renew_subs(struct ceph_mon_client *monc)
483 {
484         mutex_lock(&monc->mutex);
485         __send_subscribe(monc);
486         mutex_unlock(&monc->mutex);
487 }
488 EXPORT_SYMBOL(ceph_monc_renew_subs);
489 
490 /*
491  * Wait for an osdmap with a given epoch.
492  *
493  * @epoch: epoch to wait for
494  * @timeout: in jiffies, 0 means "wait forever"
495  */
496 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
497                           unsigned long timeout)
498 {
499         unsigned long started = jiffies;
500         long ret;
501 
502         mutex_lock(&monc->mutex);
503         while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
504                 mutex_unlock(&monc->mutex);
505 
506                 if (timeout && time_after_eq(jiffies, started + timeout))
507                         return -ETIMEDOUT;
508 
509                 ret = wait_event_interruptible_timeout(monc->client->auth_wq,
510                                      monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
511                                      ceph_timeout_jiffies(timeout));
512                 if (ret < 0)
513                         return ret;
514 
515                 mutex_lock(&monc->mutex);
516         }
517 
518         mutex_unlock(&monc->mutex);
519         return 0;
520 }
521 EXPORT_SYMBOL(ceph_monc_wait_osdmap);
522 
523 /*
524  * Open a session with a random monitor.  Request monmap and osdmap,
525  * which are waited upon in __ceph_open_session().
526  */
527 int ceph_monc_open_session(struct ceph_mon_client *monc)
528 {
529         mutex_lock(&monc->mutex);
530         __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
531         __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
532         __open_session(monc);
533         __schedule_delayed(monc);
534         mutex_unlock(&monc->mutex);
535         return 0;
536 }
537 EXPORT_SYMBOL(ceph_monc_open_session);
538 
539 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
540                                  struct ceph_msg *msg)
541 {
542         struct ceph_client *client = monc->client;
543         struct ceph_monmap *monmap;
544         void *p, *end;
545 
546         mutex_lock(&monc->mutex);
547 
548         dout("handle_monmap\n");
549         p = msg->front.iov_base;
550         end = p + msg->front.iov_len;
551 
552         monmap = ceph_monmap_decode(&p, end, ceph_msgr2(client));
553         if (IS_ERR(monmap)) {
554                 pr_err("problem decoding monmap, %d\n",
555                        (int)PTR_ERR(monmap));
556                 ceph_msg_dump(msg);
557                 goto out;
558         }
559 
560         if (ceph_check_fsid(client, &monmap->fsid) < 0) {
561                 kfree(monmap);
562                 goto out;
563         }
564 
565         kfree(monc->monmap);
566         monc->monmap = monmap;
567 
568         __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
569         client->have_fsid = true;
570 
571 out:
572         mutex_unlock(&monc->mutex);
573         wake_up_all(&client->auth_wq);
574 }
575 
576 /*
577  * generic requests (currently statfs, mon_get_version)
578  */
579 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
580 
581 static void release_generic_request(struct kref *kref)
582 {
583         struct ceph_mon_generic_request *req =
584                 container_of(kref, struct ceph_mon_generic_request, kref);
585 
586         dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
587              req->reply);
588         WARN_ON(!RB_EMPTY_NODE(&req->node));
589 
590         if (req->reply)
591                 ceph_msg_put(req->reply);
592         if (req->request)
593                 ceph_msg_put(req->request);
594 
595         kfree(req);
596 }
597 
598 static void put_generic_request(struct ceph_mon_generic_request *req)
599 {
600         if (req)
601                 kref_put(&req->kref, release_generic_request);
602 }
603 
604 static void get_generic_request(struct ceph_mon_generic_request *req)
605 {
606         kref_get(&req->kref);
607 }
608 
609 static struct ceph_mon_generic_request *
610 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
611 {
612         struct ceph_mon_generic_request *req;
613 
614         req = kzalloc(sizeof(*req), gfp);
615         if (!req)
616                 return NULL;
617 
618         req->monc = monc;
619         kref_init(&req->kref);
620         RB_CLEAR_NODE(&req->node);
621         init_completion(&req->completion);
622 
623         dout("%s greq %p\n", __func__, req);
624         return req;
625 }
626 
627 static void register_generic_request(struct ceph_mon_generic_request *req)
628 {
629         struct ceph_mon_client *monc = req->monc;
630 
631         WARN_ON(req->tid);
632 
633         get_generic_request(req);
634         req->tid = ++monc->last_tid;
635         insert_generic_request(&monc->generic_request_tree, req);
636 }
637 
638 static void send_generic_request(struct ceph_mon_client *monc,
639                                  struct ceph_mon_generic_request *req)
640 {
641         WARN_ON(!req->tid);
642 
643         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
644         req->request->hdr.tid = cpu_to_le64(req->tid);
645         ceph_con_send(&monc->con, ceph_msg_get(req->request));
646 }
647 
648 static void __finish_generic_request(struct ceph_mon_generic_request *req)
649 {
650         struct ceph_mon_client *monc = req->monc;
651 
652         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
653         erase_generic_request(&monc->generic_request_tree, req);
654 
655         ceph_msg_revoke(req->request);
656         ceph_msg_revoke_incoming(req->reply);
657 }
658 
659 static void finish_generic_request(struct ceph_mon_generic_request *req)
660 {
661         __finish_generic_request(req);
662         put_generic_request(req);
663 }
664 
665 static void complete_generic_request(struct ceph_mon_generic_request *req)
666 {
667         if (req->complete_cb)
668                 req->complete_cb(req);
669         else
670                 complete_all(&req->completion);
671         put_generic_request(req);
672 }
673 
674 static void cancel_generic_request(struct ceph_mon_generic_request *req)
675 {
676         struct ceph_mon_client *monc = req->monc;
677         struct ceph_mon_generic_request *lookup_req;
678 
679         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
680 
681         mutex_lock(&monc->mutex);
682         lookup_req = lookup_generic_request(&monc->generic_request_tree,
683                                             req->tid);
684         if (lookup_req) {
685                 WARN_ON(lookup_req != req);
686                 finish_generic_request(req);
687         }
688 
689         mutex_unlock(&monc->mutex);
690 }
691 
692 static int wait_generic_request(struct ceph_mon_generic_request *req)
693 {
694         int ret;
695 
696         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
697         ret = wait_for_completion_interruptible(&req->completion);
698         if (ret)
699                 cancel_generic_request(req);
700         else
701                 ret = req->result; /* completed */
702 
703         return ret;
704 }
705 
706 static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
707                                          struct ceph_msg_header *hdr,
708                                          int *skip)
709 {
710         struct ceph_mon_client *monc = con->private;
711         struct ceph_mon_generic_request *req;
712         u64 tid = le64_to_cpu(hdr->tid);
713         struct ceph_msg *m;
714 
715         mutex_lock(&monc->mutex);
716         req = lookup_generic_request(&monc->generic_request_tree, tid);
717         if (!req) {
718                 dout("get_generic_reply %lld dne\n", tid);
719                 *skip = 1;
720                 m = NULL;
721         } else {
722                 dout("get_generic_reply %lld got %p\n", tid, req->reply);
723                 *skip = 0;
724                 m = ceph_msg_get(req->reply);
725                 /*
726                  * we don't need to track the connection reading into
727                  * this reply because we only have one open connection
728                  * at a time, ever.
729                  */
730         }
731         mutex_unlock(&monc->mutex);
732         return m;
733 }
734 
735 /*
736  * statfs
737  */
738 static void handle_statfs_reply(struct ceph_mon_client *monc,
739                                 struct ceph_msg *msg)
740 {
741         struct ceph_mon_generic_request *req;
742         struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
743         u64 tid = le64_to_cpu(msg->hdr.tid);
744 
745         dout("%s msg %p tid %llu\n", __func__, msg, tid);
746 
747         if (msg->front.iov_len != sizeof(*reply))
748                 goto bad;
749 
750         mutex_lock(&monc->mutex);
751         req = lookup_generic_request(&monc->generic_request_tree, tid);
752         if (!req) {
753                 mutex_unlock(&monc->mutex);
754                 return;
755         }
756 
757         req->result = 0;
758         *req->u.st = reply->st; /* struct */
759         __finish_generic_request(req);
760         mutex_unlock(&monc->mutex);
761 
762         complete_generic_request(req);
763         return;
764 
765 bad:
766         pr_err("corrupt statfs reply, tid %llu\n", tid);
767         ceph_msg_dump(msg);
768 }
769 
770 /*
771  * Do a synchronous statfs().
772  */
773 int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool,
774                         struct ceph_statfs *buf)
775 {
776         struct ceph_mon_generic_request *req;
777         struct ceph_mon_statfs *h;
778         int ret = -ENOMEM;
779 
780         req = alloc_generic_request(monc, GFP_NOFS);
781         if (!req)
782                 goto out;
783 
784         req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
785                                     true);
786         if (!req->request)
787                 goto out;
788 
789         req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
790         if (!req->reply)
791                 goto out;
792 
793         req->u.st = buf;
794         req->request->hdr.version = cpu_to_le16(2);
795 
796         mutex_lock(&monc->mutex);
797         register_generic_request(req);
798         /* fill out request */
799         h = req->request->front.iov_base;
800         h->monhdr.have_version = 0;
801         h->monhdr.session_mon = cpu_to_le16(-1);
802         h->monhdr.session_mon_tid = 0;
803         h->fsid = monc->monmap->fsid;
804         h->contains_data_pool = (data_pool != CEPH_NOPOOL);
805         h->data_pool = cpu_to_le64(data_pool);
806         send_generic_request(monc, req);
807         mutex_unlock(&monc->mutex);
808 
809         ret = wait_generic_request(req);
810 out:
811         put_generic_request(req);
812         return ret;
813 }
814 EXPORT_SYMBOL(ceph_monc_do_statfs);
815 
816 static void handle_get_version_reply(struct ceph_mon_client *monc,
817                                      struct ceph_msg *msg)
818 {
819         struct ceph_mon_generic_request *req;
820         u64 tid = le64_to_cpu(msg->hdr.tid);
821         void *p = msg->front.iov_base;
822         void *end = p + msg->front_alloc_len;
823         u64 handle;
824 
825         dout("%s msg %p tid %llu\n", __func__, msg, tid);
826 
827         ceph_decode_need(&p, end, 2*sizeof(u64), bad);
828         handle = ceph_decode_64(&p);
829         if (tid != 0 && tid != handle)
830                 goto bad;
831 
832         mutex_lock(&monc->mutex);
833         req = lookup_generic_request(&monc->generic_request_tree, handle);
834         if (!req) {
835                 mutex_unlock(&monc->mutex);
836                 return;
837         }
838 
839         req->result = 0;
840         req->u.newest = ceph_decode_64(&p);
841         __finish_generic_request(req);
842         mutex_unlock(&monc->mutex);
843 
844         complete_generic_request(req);
845         return;
846 
847 bad:
848         pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
849         ceph_msg_dump(msg);
850 }
851 
852 static struct ceph_mon_generic_request *
853 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
854                         ceph_monc_callback_t cb, u64 private_data)
855 {
856         struct ceph_mon_generic_request *req;
857 
858         req = alloc_generic_request(monc, GFP_NOIO);
859         if (!req)
860                 goto err_put_req;
861 
862         req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
863                                     sizeof(u64) + sizeof(u32) + strlen(what),
864                                     GFP_NOIO, true);
865         if (!req->request)
866                 goto err_put_req;
867 
868         req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
869                                   true);
870         if (!req->reply)
871                 goto err_put_req;
872 
873         req->complete_cb = cb;
874         req->private_data = private_data;
875 
876         mutex_lock(&monc->mutex);
877         register_generic_request(req);
878         {
879                 void *p = req->request->front.iov_base;
880                 void *const end = p + req->request->front_alloc_len;
881 
882                 ceph_encode_64(&p, req->tid); /* handle */
883                 ceph_encode_string(&p, end, what, strlen(what));
884                 WARN_ON(p != end);
885         }
886         send_generic_request(monc, req);
887         mutex_unlock(&monc->mutex);
888 
889         return req;
890 
891 err_put_req:
892         put_generic_request(req);
893         return ERR_PTR(-ENOMEM);
894 }
895 
896 /*
897  * Send MMonGetVersion and wait for the reply.
898  *
899  * @what: one of "mdsmap", "osdmap" or "monmap"
900  */
901 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
902                           u64 *newest)
903 {
904         struct ceph_mon_generic_request *req;
905         int ret;
906 
907         req = __ceph_monc_get_version(monc, what, NULL, 0);
908         if (IS_ERR(req))
909                 return PTR_ERR(req);
910 
911         ret = wait_generic_request(req);
912         if (!ret)
913                 *newest = req->u.newest;
914 
915         put_generic_request(req);
916         return ret;
917 }
918 EXPORT_SYMBOL(ceph_monc_get_version);
919 
920 /*
921  * Send MMonGetVersion,
922  *
923  * @what: one of "mdsmap", "osdmap" or "monmap"
924  */
925 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
926                                 ceph_monc_callback_t cb, u64 private_data)
927 {
928         struct ceph_mon_generic_request *req;
929 
930         req = __ceph_monc_get_version(monc, what, cb, private_data);
931         if (IS_ERR(req))
932                 return PTR_ERR(req);
933 
934         put_generic_request(req);
935         return 0;
936 }
937 EXPORT_SYMBOL(ceph_monc_get_version_async);
938 
939 static void handle_command_ack(struct ceph_mon_client *monc,
940                                struct ceph_msg *msg)
941 {
942         struct ceph_mon_generic_request *req;
943         void *p = msg->front.iov_base;
944         void *const end = p + msg->front_alloc_len;
945         u64 tid = le64_to_cpu(msg->hdr.tid);
946 
947         dout("%s msg %p tid %llu\n", __func__, msg, tid);
948 
949         ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
950                                                             sizeof(u32), bad);
951         p += sizeof(struct ceph_mon_request_header);
952 
953         mutex_lock(&monc->mutex);
954         req = lookup_generic_request(&monc->generic_request_tree, tid);
955         if (!req) {
956                 mutex_unlock(&monc->mutex);
957                 return;
958         }
959 
960         req->result = ceph_decode_32(&p);
961         __finish_generic_request(req);
962         mutex_unlock(&monc->mutex);
963 
964         complete_generic_request(req);
965         return;
966 
967 bad:
968         pr_err("corrupt mon_command ack, tid %llu\n", tid);
969         ceph_msg_dump(msg);
970 }
971 
972 static __printf(2, 0)
973 int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt,
974                          va_list ap)
975 {
976         struct ceph_mon_generic_request *req;
977         struct ceph_mon_command *h;
978         int ret = -ENOMEM;
979         int len;
980 
981         req = alloc_generic_request(monc, GFP_NOIO);
982         if (!req)
983                 goto out;
984 
985         req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
986         if (!req->request)
987                 goto out;
988 
989         req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
990                                   true);
991         if (!req->reply)
992                 goto out;
993 
994         mutex_lock(&monc->mutex);
995         register_generic_request(req);
996         h = req->request->front.iov_base;
997         h->monhdr.have_version = 0;
998         h->monhdr.session_mon = cpu_to_le16(-1);
999         h->monhdr.session_mon_tid = 0;
1000         h->fsid = monc->monmap->fsid;
1001         h->num_strs = cpu_to_le32(1);
1002         len = vsprintf(h->str, fmt, ap);
1003         h->str_len = cpu_to_le32(len);
1004         send_generic_request(monc, req);
1005         mutex_unlock(&monc->mutex);
1006 
1007         ret = wait_generic_request(req);
1008 out:
1009         put_generic_request(req);
1010         return ret;
1011 }
1012 
1013 static __printf(2, 3)
1014 int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...)
1015 {
1016         va_list ap;
1017         int ret;
1018 
1019         va_start(ap, fmt);
1020         ret = do_mon_command_vargs(monc, fmt, ap);
1021         va_end(ap);
1022         return ret;
1023 }
1024 
1025 int ceph_monc_blocklist_add(struct ceph_mon_client *monc,
1026                             struct ceph_entity_addr *client_addr)
1027 {
1028         int ret;
1029 
1030         ret = do_mon_command(monc,
1031                              "{ \"prefix\": \"osd blocklist\", \
1032                                 \"blocklistop\": \"add\", \
1033                                 \"addr\": \"%pISpc/%u\" }",
1034                              &client_addr->in_addr,
1035                              le32_to_cpu(client_addr->nonce));
1036         if (ret == -EINVAL) {
1037                 /*
1038                  * The monitor returns EINVAL on an unrecognized command.
1039                  * Try the legacy command -- it is exactly the same except
1040                  * for the name.
1041                  */
1042                 ret = do_mon_command(monc,
1043                                      "{ \"prefix\": \"osd blacklist\", \
1044                                         \"blacklistop\": \"add\", \
1045                                         \"addr\": \"%pISpc/%u\" }",
1046                                      &client_addr->in_addr,
1047                                      le32_to_cpu(client_addr->nonce));
1048         }
1049         if (ret)
1050                 return ret;
1051 
1052         /*
1053          * Make sure we have the osdmap that includes the blocklist
1054          * entry.  This is needed to ensure that the OSDs pick up the
1055          * new blocklist before processing any future requests from
1056          * this client.
1057          */
1058         return ceph_wait_for_latest_osdmap(monc->client, 0);
1059 }
1060 EXPORT_SYMBOL(ceph_monc_blocklist_add);
1061 
1062 /*
1063  * Resend pending generic requests.
1064  */
1065 static void __resend_generic_request(struct ceph_mon_client *monc)
1066 {
1067         struct ceph_mon_generic_request *req;
1068         struct rb_node *p;
1069 
1070         for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
1071                 req = rb_entry(p, struct ceph_mon_generic_request, node);
1072                 ceph_msg_revoke(req->request);
1073                 ceph_msg_revoke_incoming(req->reply);
1074                 ceph_con_send(&monc->con, ceph_msg_get(req->request));
1075         }
1076 }
1077 
1078 /*
1079  * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
1080  * renew/retry subscription as needed (in case it is timing out, or we
1081  * got an ENOMEM).  And keep the monitor connection alive.
1082  */
1083 static void delayed_work(struct work_struct *work)
1084 {
1085         struct ceph_mon_client *monc =
1086                 container_of(work, struct ceph_mon_client, delayed_work.work);
1087 
1088         mutex_lock(&monc->mutex);
1089         dout("%s mon%d\n", __func__, monc->cur_mon);
1090         if (monc->cur_mon < 0) {
1091                 goto out;
1092         }
1093 
1094         if (monc->hunting) {
1095                 dout("%s continuing hunt\n", __func__);
1096                 reopen_session(monc);
1097         } else {
1098                 int is_auth = ceph_auth_is_authenticated(monc->auth);
1099 
1100                 dout("%s is_authed %d\n", __func__, is_auth);
1101                 if (ceph_con_keepalive_expired(&monc->con,
1102                                                CEPH_MONC_PING_TIMEOUT)) {
1103                         dout("monc keepalive timeout\n");
1104                         is_auth = 0;
1105                         reopen_session(monc);
1106                 }
1107 
1108                 if (!monc->hunting) {
1109                         ceph_con_keepalive(&monc->con);
1110                         __validate_auth(monc);
1111                         un_backoff(monc);
1112                 }
1113 
1114                 if (is_auth &&
1115                     !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
1116                         unsigned long now = jiffies;
1117 
1118                         dout("%s renew subs? now %lu renew after %lu\n",
1119                              __func__, now, monc->sub_renew_after);
1120                         if (time_after_eq(now, monc->sub_renew_after))
1121                                 __send_subscribe(monc);
1122                 }
1123         }
1124         __schedule_delayed(monc);
1125 
1126 out:
1127         mutex_unlock(&monc->mutex);
1128 }
1129 
1130 /*
1131  * On startup, we build a temporary monmap populated with the IPs
1132  * provided by mount(2).
1133  */
1134 static int build_initial_monmap(struct ceph_mon_client *monc)
1135 {
1136         __le32 my_type = ceph_msgr2(monc->client) ?
1137                 CEPH_ENTITY_ADDR_TYPE_MSGR2 : CEPH_ENTITY_ADDR_TYPE_LEGACY;
1138         struct ceph_options *opt = monc->client->options;
1139         int num_mon = opt->num_mon;
1140         int i;
1141 
1142         /* build initial monmap */
1143         monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon),
1144                                GFP_KERNEL);
1145         if (!monc->monmap)
1146                 return -ENOMEM;
1147         monc->monmap->num_mon = num_mon;
1148 
1149         for (i = 0; i < num_mon; i++) {
1150                 struct ceph_entity_inst *inst = &monc->monmap->mon_inst[i];
1151 
1152                 memcpy(&inst->addr.in_addr, &opt->mon_addr[i].in_addr,
1153                        sizeof(inst->addr.in_addr));
1154                 inst->addr.type = my_type;
1155                 inst->addr.nonce = 0;
1156                 inst->name.type = CEPH_ENTITY_TYPE_MON;
1157                 inst->name.num = cpu_to_le64(i);
1158         }
1159         return 0;
1160 }
1161 
1162 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
1163 {
1164         int err;
1165 
1166         dout("init\n");
1167         memset(monc, 0, sizeof(*monc));
1168         monc->client = cl;
1169         mutex_init(&monc->mutex);
1170 
1171         err = build_initial_monmap(monc);
1172         if (err)
1173                 goto out;
1174 
1175         /* connection */
1176         /* authentication */
1177         monc->auth = ceph_auth_init(cl->options->name, cl->options->key,
1178                                     cl->options->con_modes);
1179         if (IS_ERR(monc->auth)) {
1180                 err = PTR_ERR(monc->auth);
1181                 goto out_monmap;
1182         }
1183         monc->auth->want_keys =
1184                 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
1185                 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
1186 
1187         /* msgs */
1188         err = -ENOMEM;
1189         monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
1190                                      sizeof(struct ceph_mon_subscribe_ack),
1191                                      GFP_KERNEL, true);
1192         if (!monc->m_subscribe_ack)
1193                 goto out_auth;
1194 
1195         monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
1196                                          GFP_KERNEL, true);
1197         if (!monc->m_subscribe)
1198                 goto out_subscribe_ack;
1199 
1200         monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
1201                                           GFP_KERNEL, true);
1202         if (!monc->m_auth_reply)
1203                 goto out_subscribe;
1204 
1205         monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
1206         monc->pending_auth = 0;
1207         if (!monc->m_auth)
1208                 goto out_auth_reply;
1209 
1210         ceph_con_init(&monc->con, monc, &mon_con_ops,
1211                       &monc->client->msgr);
1212 
1213         monc->cur_mon = -1;
1214         monc->had_a_connection = false;
1215         monc->hunt_mult = 1;
1216 
1217         INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
1218         monc->generic_request_tree = RB_ROOT;
1219         monc->last_tid = 0;
1220 
1221         monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
1222 
1223         return 0;
1224 
1225 out_auth_reply:
1226         ceph_msg_put(monc->m_auth_reply);
1227 out_subscribe:
1228         ceph_msg_put(monc->m_subscribe);
1229 out_subscribe_ack:
1230         ceph_msg_put(monc->m_subscribe_ack);
1231 out_auth:
1232         ceph_auth_destroy(monc->auth);
1233 out_monmap:
1234         kfree(monc->monmap);
1235 out:
1236         return err;
1237 }
1238 EXPORT_SYMBOL(ceph_monc_init);
1239 
1240 void ceph_monc_stop(struct ceph_mon_client *monc)
1241 {
1242         dout("stop\n");
1243 
1244         mutex_lock(&monc->mutex);
1245         __close_session(monc);
1246         monc->hunting = false;
1247         monc->cur_mon = -1;
1248         mutex_unlock(&monc->mutex);
1249 
1250         cancel_delayed_work_sync(&monc->delayed_work);
1251 
1252         /*
1253          * flush msgr queue before we destroy ourselves to ensure that:
1254          *  - any work that references our embedded con is finished.
1255          *  - any osd_client or other work that may reference an authorizer
1256          *    finishes before we shut down the auth subsystem.
1257          */
1258         ceph_msgr_flush();
1259 
1260         ceph_auth_destroy(monc->auth);
1261 
1262         WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
1263 
1264         ceph_msg_put(monc->m_auth);
1265         ceph_msg_put(monc->m_auth_reply);
1266         ceph_msg_put(monc->m_subscribe);
1267         ceph_msg_put(monc->m_subscribe_ack);
1268 
1269         kfree(monc->monmap);
1270 }
1271 EXPORT_SYMBOL(ceph_monc_stop);
1272 
1273 static void finish_hunting(struct ceph_mon_client *monc)
1274 {
1275         if (monc->hunting) {
1276                 dout("%s found mon%d\n", __func__, monc->cur_mon);
1277                 monc->hunting = false;
1278                 monc->had_a_connection = true;
1279                 un_backoff(monc);
1280                 __schedule_delayed(monc);
1281         }
1282 }
1283 
1284 static void finish_auth(struct ceph_mon_client *monc, int auth_err,
1285                         bool was_authed)
1286 {
1287         dout("%s auth_err %d was_authed %d\n", __func__, auth_err, was_authed);
1288         WARN_ON(auth_err > 0);
1289 
1290         monc->pending_auth = 0;
1291         if (auth_err) {
1292                 monc->client->auth_err = auth_err;
1293                 wake_up_all(&monc->client->auth_wq);
1294                 return;
1295         }
1296 
1297         if (!was_authed && ceph_auth_is_authenticated(monc->auth)) {
1298                 dout("%s authenticated, starting session global_id %llu\n",
1299                      __func__, monc->auth->global_id);
1300 
1301                 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
1302                 monc->client->msgr.inst.name.num =
1303                                         cpu_to_le64(monc->auth->global_id);
1304 
1305                 __send_subscribe(monc);
1306                 __resend_generic_request(monc);
1307 
1308                 pr_info("mon%d %s session established\n", monc->cur_mon,
1309                         ceph_pr_addr(&monc->con.peer_addr));
1310         }
1311 }
1312 
1313 static void handle_auth_reply(struct ceph_mon_client *monc,
1314                               struct ceph_msg *msg)
1315 {
1316         bool was_authed;
1317         int ret;
1318 
1319         mutex_lock(&monc->mutex);
1320         was_authed = ceph_auth_is_authenticated(monc->auth);
1321         ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
1322                                      msg->front.iov_len,
1323                                      monc->m_auth->front.iov_base,
1324                                      monc->m_auth->front_alloc_len);
1325         if (ret > 0) {
1326                 __send_prepared_auth_request(monc, ret);
1327         } else {
1328                 finish_auth(monc, ret, was_authed);
1329                 finish_hunting(monc);
1330         }
1331         mutex_unlock(&monc->mutex);
1332 }
1333 
1334 static int __validate_auth(struct ceph_mon_client *monc)
1335 {
1336         int ret;
1337 
1338         if (monc->pending_auth)
1339                 return 0;
1340 
1341         ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
1342                               monc->m_auth->front_alloc_len);
1343         if (ret <= 0)
1344                 return ret; /* either an error, or no need to authenticate */
1345         __send_prepared_auth_request(monc, ret);
1346         return 0;
1347 }
1348 
1349 int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1350 {
1351         int ret;
1352 
1353         mutex_lock(&monc->mutex);
1354         ret = __validate_auth(monc);
1355         mutex_unlock(&monc->mutex);
1356         return ret;
1357 }
1358 EXPORT_SYMBOL(ceph_monc_validate_auth);
1359 
1360 static int mon_get_auth_request(struct ceph_connection *con,
1361                                 void *buf, int *buf_len,
1362                                 void **authorizer, int *authorizer_len)
1363 {
1364         struct ceph_mon_client *monc = con->private;
1365         int ret;
1366 
1367         mutex_lock(&monc->mutex);
1368         ret = ceph_auth_get_request(monc->auth, buf, *buf_len);
1369         mutex_unlock(&monc->mutex);
1370         if (ret < 0)
1371                 return ret;
1372 
1373         *buf_len = ret;
1374         *authorizer = NULL;
1375         *authorizer_len = 0;
1376         return 0;
1377 }
1378 
1379 static int mon_handle_auth_reply_more(struct ceph_connection *con,
1380                                       void *reply, int reply_len,
1381                                       void *buf, int *buf_len,
1382                                       void **authorizer, int *authorizer_len)
1383 {
1384         struct ceph_mon_client *monc = con->private;
1385         int ret;
1386 
1387         mutex_lock(&monc->mutex);
1388         ret = ceph_auth_handle_reply_more(monc->auth, reply, reply_len,
1389                                           buf, *buf_len);
1390         mutex_unlock(&monc->mutex);
1391         if (ret < 0)
1392                 return ret;
1393 
1394         *buf_len = ret;
1395         *authorizer = NULL;
1396         *authorizer_len = 0;
1397         return 0;
1398 }
1399 
1400 static int mon_handle_auth_done(struct ceph_connection *con,
1401                                 u64 global_id, void *reply, int reply_len,
1402                                 u8 *session_key, int *session_key_len,
1403                                 u8 *con_secret, int *con_secret_len)
1404 {
1405         struct ceph_mon_client *monc = con->private;
1406         bool was_authed;
1407         int ret;
1408 
1409         mutex_lock(&monc->mutex);
1410         WARN_ON(!monc->hunting);
1411         was_authed = ceph_auth_is_authenticated(monc->auth);
1412         ret = ceph_auth_handle_reply_done(monc->auth, global_id,
1413                                           reply, reply_len,
1414                                           session_key, session_key_len,
1415                                           con_secret, con_secret_len);
1416         finish_auth(monc, ret, was_authed);
1417         if (!ret)
1418                 finish_hunting(monc);
1419         mutex_unlock(&monc->mutex);
1420         return 0;
1421 }
1422 
1423 static int mon_handle_auth_bad_method(struct ceph_connection *con,
1424                                       int used_proto, int result,
1425                                       const int *allowed_protos, int proto_cnt,
1426                                       const int *allowed_modes, int mode_cnt)
1427 {
1428         struct ceph_mon_client *monc = con->private;
1429         bool was_authed;
1430 
1431         mutex_lock(&monc->mutex);
1432         WARN_ON(!monc->hunting);
1433         was_authed = ceph_auth_is_authenticated(monc->auth);
1434         ceph_auth_handle_bad_method(monc->auth, used_proto, result,
1435                                     allowed_protos, proto_cnt,
1436                                     allowed_modes, mode_cnt);
1437         finish_auth(monc, -EACCES, was_authed);
1438         mutex_unlock(&monc->mutex);
1439         return 0;
1440 }
1441 
1442 /*
1443  * handle incoming message
1444  */
1445 static void mon_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1446 {
1447         struct ceph_mon_client *monc = con->private;
1448         int type = le16_to_cpu(msg->hdr.type);
1449 
1450         switch (type) {
1451         case CEPH_MSG_AUTH_REPLY:
1452                 handle_auth_reply(monc, msg);
1453                 break;
1454 
1455         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1456                 handle_subscribe_ack(monc, msg);
1457                 break;
1458 
1459         case CEPH_MSG_STATFS_REPLY:
1460                 handle_statfs_reply(monc, msg);
1461                 break;
1462 
1463         case CEPH_MSG_MON_GET_VERSION_REPLY:
1464                 handle_get_version_reply(monc, msg);
1465                 break;
1466 
1467         case CEPH_MSG_MON_COMMAND_ACK:
1468                 handle_command_ack(monc, msg);
1469                 break;
1470 
1471         case CEPH_MSG_MON_MAP:
1472                 ceph_monc_handle_map(monc, msg);
1473                 break;
1474 
1475         case CEPH_MSG_OSD_MAP:
1476                 ceph_osdc_handle_map(&monc->client->osdc, msg);
1477                 break;
1478 
1479         default:
1480                 /* can the chained handler handle it? */
1481                 if (monc->client->extra_mon_dispatch &&
1482                     monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1483                         break;
1484 
1485                 pr_err("received unknown message type %d %s\n", type,
1486                        ceph_msg_type_name(type));
1487         }
1488         ceph_msg_put(msg);
1489 }
1490 
1491 /*
1492  * Allocate memory for incoming message
1493  */
1494 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1495                                       struct ceph_msg_header *hdr,
1496                                       int *skip)
1497 {
1498         struct ceph_mon_client *monc = con->private;
1499         int type = le16_to_cpu(hdr->type);
1500         int front_len = le32_to_cpu(hdr->front_len);
1501         struct ceph_msg *m = NULL;
1502 
1503         *skip = 0;
1504 
1505         switch (type) {
1506         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1507                 m = ceph_msg_get(monc->m_subscribe_ack);
1508                 break;
1509         case CEPH_MSG_STATFS_REPLY:
1510         case CEPH_MSG_MON_COMMAND_ACK:
1511                 return get_generic_reply(con, hdr, skip);
1512         case CEPH_MSG_AUTH_REPLY:
1513                 m = ceph_msg_get(monc->m_auth_reply);
1514                 break;
1515         case CEPH_MSG_MON_GET_VERSION_REPLY:
1516                 if (le64_to_cpu(hdr->tid) != 0)
1517                         return get_generic_reply(con, hdr, skip);
1518 
1519                 /*
1520                  * Older OSDs don't set reply tid even if the original
1521                  * request had a non-zero tid.  Work around this weirdness
1522                  * by allocating a new message.
1523                  */
1524                 fallthrough;
1525         case CEPH_MSG_MON_MAP:
1526         case CEPH_MSG_MDS_MAP:
1527         case CEPH_MSG_OSD_MAP:
1528         case CEPH_MSG_FS_MAP_USER:
1529                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1530                 if (!m)
1531                         return NULL;    /* ENOMEM--return skip == 0 */
1532                 break;
1533         }
1534 
1535         if (!m) {
1536                 pr_info("alloc_msg unknown type %d\n", type);
1537                 *skip = 1;
1538         } else if (front_len > m->front_alloc_len) {
1539                 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1540                         front_len, m->front_alloc_len,
1541                         (unsigned int)con->peer_name.type,
1542                         le64_to_cpu(con->peer_name.num));
1543                 ceph_msg_put(m);
1544                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1545         }
1546 
1547         return m;
1548 }
1549 
1550 /*
1551  * If the monitor connection resets, pick a new monitor and resubmit
1552  * any pending requests.
1553  */
1554 static void mon_fault(struct ceph_connection *con)
1555 {
1556         struct ceph_mon_client *monc = con->private;
1557 
1558         mutex_lock(&monc->mutex);
1559         dout("%s mon%d\n", __func__, monc->cur_mon);
1560         if (monc->cur_mon >= 0) {
1561                 if (!monc->hunting) {
1562                         dout("%s hunting for new mon\n", __func__);
1563                         reopen_session(monc);
1564                         __schedule_delayed(monc);
1565                 } else {
1566                         dout("%s already hunting\n", __func__);
1567                 }
1568         }
1569         mutex_unlock(&monc->mutex);
1570 }
1571 
1572 /*
1573  * We can ignore refcounting on the connection struct, as all references
1574  * will come from the messenger workqueue, which is drained prior to
1575  * mon_client destruction.
1576  */
1577 static struct ceph_connection *mon_get_con(struct ceph_connection *con)
1578 {
1579         return con;
1580 }
1581 
1582 static void mon_put_con(struct ceph_connection *con)
1583 {
1584 }
1585 
1586 static const struct ceph_connection_operations mon_con_ops = {
1587         .get = mon_get_con,
1588         .put = mon_put_con,
1589         .alloc_msg = mon_alloc_msg,
1590         .dispatch = mon_dispatch,
1591         .fault = mon_fault,
1592         .get_auth_request = mon_get_auth_request,
1593         .handle_auth_reply_more = mon_handle_auth_reply_more,
1594         .handle_auth_done = mon_handle_auth_done,
1595         .handle_auth_bad_method = mon_handle_auth_bad_method,
1596 };
1597 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~

kernel.org | git.kernel.org | LWN.net | Project Home | SVN repository | Mail admin

Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.

sflogo.php