1 /* 1 /* 2 * net/tipc/subscr.c: TIPC network topology se 2 * net/tipc/subscr.c: TIPC network topology service 3 * 3 * 4 * Copyright (c) 2000-2017, Ericsson AB !! 4 * Copyright (c) 2000-2006, Ericsson AB 5 * Copyright (c) 2005-2007, 2010-2013, Wind Ri !! 5 * Copyright (c) 2005-2007, 2010-2011, Wind River Systems 6 * Copyright (c) 2020-2021, Red Hat Inc << 7 * All rights reserved. 6 * All rights reserved. 8 * 7 * 9 * Redistribution and use in source and binary 8 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that t 9 * modification, are permitted provided that the following conditions are met: 11 * 10 * 12 * 1. Redistributions of source code must reta 11 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the 12 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must repr 13 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the 14 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials pro 15 * documentation and/or other materials provided with the distribution. 17 * 3. Neither the names of the copyright holde 16 * 3. Neither the names of the copyright holders nor the names of its 18 * contributors may be used to endorse or p 17 * contributors may be used to endorse or promote products derived from 19 * this software without specific prior wri 18 * this software without specific prior written permission. 20 * 19 * 21 * Alternatively, this software may be distrib 20 * Alternatively, this software may be distributed under the terms of the 22 * GNU General Public License ("GPL") version 21 * GNU General Public License ("GPL") version 2 as published by the Free 23 * Software Foundation. 22 * Software Foundation. 24 * 23 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCL 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND F 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYR 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT L 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THE 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUD 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 34 * ARISING IN ANY WAY OUT OF THE USE OF THIS S 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 * POSSIBILITY OF SUCH DAMAGE. 34 * POSSIBILITY OF SUCH DAMAGE. 36 */ 35 */ 37 36 38 #include "core.h" 37 #include "core.h" 39 #include "name_table.h" 38 #include "name_table.h" >> 39 #include "port.h" 40 #include "subscr.h" 40 #include "subscr.h" 41 41 42 static void tipc_sub_send_event(struct tipc_su !! 42 /** 43 struct publica !! 43 * struct tipc_subscriber - TIPC network topology subscriber 44 u32 event) !! 44 * @port_ref: object reference to server port connecting to subscriber 45 { !! 45 * @lock: pointer to spinlock controlling access to subscriber's server port 46 struct tipc_subscr *s = &sub->evt.s; !! 46 * @subscriber_list: adjacent subscribers in top. server's list of subscribers 47 struct tipc_event *evt = &sub->evt; !! 47 * @subscription_list: list of subscription objects for this subscriber 48 !! 48 */ 49 if (sub->inactive) !! 49 struct tipc_subscriber { 50 return; !! 50 u32 port_ref; 51 tipc_evt_write(evt, event, event); !! 51 spinlock_t *lock; 52 if (p) { !! 52 struct list_head subscriber_list; 53 tipc_evt_write(evt, found_lowe !! 53 struct list_head subscription_list; 54 tipc_evt_write(evt, found_uppe !! 54 }; 55 tipc_evt_write(evt, port.ref, !! 55 56 tipc_evt_write(evt, port.node, !! 56 /** 57 } else { !! 57 * struct top_srv - TIPC network topology subscription service 58 tipc_evt_write(evt, found_lowe !! 58 * @setup_port: reference to TIPC port that handles subscription requests 59 tipc_evt_write(evt, found_uppe !! 59 * @subscription_count: number of active subscriptions (not subscribers!) 60 tipc_evt_write(evt, port.ref, !! 60 * @subscriber_list: list of ports subscribing to service 61 tipc_evt_write(evt, port.node, !! 61 * @lock: spinlock govering access to subscriber list 62 } !! 62 */ 63 tipc_topsrv_queue_evt(sub->net, sub->c !! 63 struct top_srv { >> 64 u32 setup_port; >> 65 atomic_t subscription_count; >> 66 struct list_head subscriber_list; >> 67 spinlock_t lock; >> 68 }; >> 69 >> 70 static struct top_srv topsrv; >> 71 >> 72 /** >> 73 * htohl - convert value to endianness used by destination >> 74 * @in: value to convert >> 75 * @swap: non-zero if endianness must be reversed >> 76 * >> 77 * Returns converted value >> 78 */ >> 79 static u32 htohl(u32 in, int swap) >> 80 { >> 81 return swap ? swab32(in) : in; 64 } 82 } 65 83 66 /** 84 /** 67 * tipc_sub_check_overlap - test for subscript !! 85 * subscr_send_event - send a message containing a tipc_event to the subscriber 68 * @subscribed: the service range subscribed f << 69 * @found: the service range we are checking f << 70 * 86 * 71 * Returns true if there is overlap, otherwise !! 87 * Note: Must not hold subscriber's server port lock, since tipc_send() will >> 88 * try to take the lock if the message is rejected and returned! 72 */ 89 */ 73 static bool tipc_sub_check_overlap(struct tipc !! 90 static void subscr_send_event(struct tipc_subscription *sub, 74 struct tipc !! 91 u32 found_lower, >> 92 u32 found_upper, >> 93 u32 event, >> 94 u32 port_ref, >> 95 u32 node) 75 { 96 { 76 u32 found_lower = found->lower; !! 97 struct iovec msg_sect; 77 u32 found_upper = found->upper; !! 98 >> 99 msg_sect.iov_base = (void *)&sub->evt; >> 100 msg_sect.iov_len = sizeof(struct tipc_event); 78 101 79 if (found_lower < subscribed->lower) !! 102 sub->evt.event = htohl(event, sub->swap); 80 found_lower = subscribed->lowe !! 103 sub->evt.found_lower = htohl(found_lower, sub->swap); 81 if (found_upper > subscribed->upper) !! 104 sub->evt.found_upper = htohl(found_upper, sub->swap); 82 found_upper = subscribed->uppe !! 105 sub->evt.port.ref = htohl(port_ref, sub->swap); 83 return found_lower <= found_upper; !! 106 sub->evt.port.node = htohl(node, sub->swap); >> 107 tipc_send(sub->server_ref, 1, &msg_sect, msg_sect.iov_len); 84 } 108 } 85 109 86 void tipc_sub_report_overlap(struct tipc_subsc !! 110 /** 87 struct publicatio !! 111 * tipc_subscr_overlap - test for subscription overlap with the given values 88 u32 event, bool m !! 112 * >> 113 * Returns 1 if there is overlap, otherwise 0. >> 114 */ >> 115 int tipc_subscr_overlap(struct tipc_subscription *sub, >> 116 u32 found_lower, >> 117 u32 found_upper) >> 118 89 { 119 { 90 struct tipc_service_range *sr = &sub-> !! 120 if (found_lower < sub->seq.lower) 91 u32 filter = sub->s.filter; !! 121 found_lower = sub->seq.lower; >> 122 if (found_upper > sub->seq.upper) >> 123 found_upper = sub->seq.upper; >> 124 if (found_lower > found_upper) >> 125 return 0; >> 126 return 1; >> 127 } 92 128 93 if (!tipc_sub_check_overlap(sr, &p->sr !! 129 /** 94 return; !! 130 * tipc_subscr_report_overlap - issue event if there is subscription overlap 95 if (!must && !(filter & TIPC_SUB_PORTS !! 131 * 96 return; !! 132 * Protected by nameseq.lock in name_table.c 97 if (filter & TIPC_SUB_CLUSTER_SCOPE && !! 133 */ >> 134 void tipc_subscr_report_overlap(struct tipc_subscription *sub, >> 135 u32 found_lower, >> 136 u32 found_upper, >> 137 u32 event, >> 138 u32 port_ref, >> 139 u32 node, >> 140 int must) >> 141 { >> 142 if (!tipc_subscr_overlap(sub, found_lower, found_upper)) 98 return; 143 return; 99 if (filter & TIPC_SUB_NODE_SCOPE && p- !! 144 if (!must && !(sub->filter & TIPC_SUB_PORTS)) 100 return; 145 return; 101 spin_lock(&sub->lock); !! 146 102 tipc_sub_send_event(sub, p, event); !! 147 subscr_send_event(sub, found_lower, found_upper, event, port_ref, node); 103 spin_unlock(&sub->lock); << 104 } 148 } 105 149 106 static void tipc_sub_timeout(struct timer_list !! 150 /** >> 151 * subscr_timeout - subscription timeout has occurred >> 152 */ >> 153 static void subscr_timeout(struct tipc_subscription *sub) 107 { 154 { 108 struct tipc_subscription *sub = from_t !! 155 struct tipc_port *server_port; >> 156 >> 157 /* Validate server port reference (in case subscriber is terminating) */ >> 158 server_port = tipc_port_lock(sub->server_ref); >> 159 if (server_port == NULL) >> 160 return; 109 161 110 spin_lock(&sub->lock); !! 162 /* Validate timeout (in case subscription is being cancelled) */ 111 tipc_sub_send_event(sub, NULL, TIPC_SU !! 163 if (sub->timeout == TIPC_WAIT_FOREVER) { 112 sub->inactive = true; !! 164 tipc_port_unlock(server_port); 113 spin_unlock(&sub->lock); !! 165 return; >> 166 } >> 167 >> 168 /* Unlink subscription from name table */ >> 169 tipc_nametbl_unsubscribe(sub); >> 170 >> 171 /* Unlink subscription from subscriber */ >> 172 list_del(&sub->subscription_list); >> 173 >> 174 /* Release subscriber's server port */ >> 175 tipc_port_unlock(server_port); >> 176 >> 177 /* Notify subscriber of timeout */ >> 178 subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, >> 179 TIPC_SUBSCR_TIMEOUT, 0, 0); >> 180 >> 181 /* Now destroy subscription */ >> 182 k_term_timer(&sub->timer); >> 183 kfree(sub); >> 184 atomic_dec(&topsrv.subscription_count); 114 } 185 } 115 186 116 static void tipc_sub_kref_release(struct kref !! 187 /** >> 188 * subscr_del - delete a subscription within a subscription list >> 189 * >> 190 * Called with subscriber port locked. >> 191 */ >> 192 static void subscr_del(struct tipc_subscription *sub) 117 { 193 { 118 kfree(container_of(kref, struct tipc_s !! 194 tipc_nametbl_unsubscribe(sub); >> 195 list_del(&sub->subscription_list); >> 196 kfree(sub); >> 197 atomic_dec(&topsrv.subscription_count); 119 } 198 } 120 199 121 void tipc_sub_put(struct tipc_subscription *su !! 200 /** >> 201 * subscr_terminate - terminate communication with a subscriber >> 202 * >> 203 * Called with subscriber port locked. Routine must temporarily release lock >> 204 * to enable subscription timeout routine(s) to finish without deadlocking; >> 205 * the lock is then reclaimed to allow caller to release it upon return. >> 206 * (This should work even in the unlikely event some other thread creates >> 207 * a new object reference in the interim that uses this lock; this routine will >> 208 * simply wait for it to be released, then claim it.) >> 209 */ >> 210 static void subscr_terminate(struct tipc_subscriber *subscriber) 122 { 211 { 123 kref_put(&subscription->kref, tipc_sub !! 212 u32 port_ref; >> 213 struct tipc_subscription *sub; >> 214 struct tipc_subscription *sub_temp; >> 215 >> 216 /* Invalidate subscriber reference */ >> 217 port_ref = subscriber->port_ref; >> 218 subscriber->port_ref = 0; >> 219 spin_unlock_bh(subscriber->lock); >> 220 >> 221 /* Sever connection to subscriber */ >> 222 tipc_shutdown(port_ref); >> 223 tipc_deleteport(port_ref); >> 224 >> 225 /* Destroy any existing subscriptions for subscriber */ >> 226 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, >> 227 subscription_list) { >> 228 if (sub->timeout != TIPC_WAIT_FOREVER) { >> 229 k_cancel_timer(&sub->timer); >> 230 k_term_timer(&sub->timer); >> 231 } >> 232 subscr_del(sub); >> 233 } >> 234 >> 235 /* Remove subscriber from topology server's subscriber list */ >> 236 spin_lock_bh(&topsrv.lock); >> 237 list_del(&subscriber->subscriber_list); >> 238 spin_unlock_bh(&topsrv.lock); >> 239 >> 240 /* Reclaim subscriber lock */ >> 241 spin_lock_bh(subscriber->lock); >> 242 >> 243 /* Now destroy subscriber */ >> 244 kfree(subscriber); 124 } 245 } 125 246 126 void tipc_sub_get(struct tipc_subscription *su !! 247 /** >> 248 * subscr_cancel - handle subscription cancellation request >> 249 * >> 250 * Called with subscriber port locked. Routine must temporarily release lock >> 251 * to enable the subscription timeout routine to finish without deadlocking; >> 252 * the lock is then reclaimed to allow caller to release it upon return. >> 253 * >> 254 * Note that fields of 's' use subscriber's endianness! >> 255 */ >> 256 static void subscr_cancel(struct tipc_subscr *s, >> 257 struct tipc_subscriber *subscriber) 127 { 258 { 128 kref_get(&subscription->kref); !! 259 struct tipc_subscription *sub; >> 260 struct tipc_subscription *sub_temp; >> 261 int found = 0; >> 262 >> 263 /* Find first matching subscription, exit if not found */ >> 264 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, >> 265 subscription_list) { >> 266 if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) { >> 267 found = 1; >> 268 break; >> 269 } >> 270 } >> 271 if (!found) >> 272 return; >> 273 >> 274 /* Cancel subscription timer (if used), then delete subscription */ >> 275 if (sub->timeout != TIPC_WAIT_FOREVER) { >> 276 sub->timeout = TIPC_WAIT_FOREVER; >> 277 spin_unlock_bh(subscriber->lock); >> 278 k_cancel_timer(&sub->timer); >> 279 k_term_timer(&sub->timer); >> 280 spin_lock_bh(subscriber->lock); >> 281 } >> 282 subscr_del(sub); 129 } 283 } 130 284 131 struct tipc_subscription *tipc_sub_subscribe(s !! 285 /** 132 s !! 286 * subscr_subscribe - create subscription for subscriber 133 i !! 287 * >> 288 * Called with subscriber port locked. >> 289 */ >> 290 static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, >> 291 struct tipc_subscriber *subscriber) 134 { 292 { 135 u32 lower = tipc_sub_read(s, seq.lower << 136 u32 upper = tipc_sub_read(s, seq.upper << 137 u32 filter = tipc_sub_read(s, filter); << 138 struct tipc_subscription *sub; 293 struct tipc_subscription *sub; 139 u32 timeout; !! 294 int swap; 140 295 141 if ((filter & TIPC_SUB_PORTS && filter !! 296 /* Determine subscriber's endianness */ 142 lower > upper) { !! 297 swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE)); 143 pr_warn("Subscription rejected !! 298 >> 299 /* Detect & process a subscription cancellation request */ >> 300 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { >> 301 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); >> 302 subscr_cancel(s, subscriber); >> 303 return NULL; >> 304 } >> 305 >> 306 /* Refuse subscription if global limit exceeded */ >> 307 if (atomic_read(&topsrv.subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { >> 308 pr_warn("Subscription rejected, limit reached (%u)\n", >> 309 TIPC_MAX_SUBSCRIPTIONS); >> 310 subscr_terminate(subscriber); 144 return NULL; 311 return NULL; 145 } 312 } >> 313 >> 314 /* Allocate subscription object */ 146 sub = kmalloc(sizeof(*sub), GFP_ATOMIC 315 sub = kmalloc(sizeof(*sub), GFP_ATOMIC); 147 if (!sub) { 316 if (!sub) { 148 pr_warn("Subscription rejected 317 pr_warn("Subscription rejected, no memory\n"); >> 318 subscr_terminate(subscriber); 149 return NULL; 319 return NULL; 150 } 320 } 151 INIT_LIST_HEAD(&sub->service_list); !! 321 152 INIT_LIST_HEAD(&sub->sub_list); !! 322 /* Initialize subscription object */ 153 sub->net = net; !! 323 sub->seq.type = htohl(s->seq.type, swap); 154 sub->conid = conid; !! 324 sub->seq.lower = htohl(s->seq.lower, swap); 155 sub->inactive = false; !! 325 sub->seq.upper = htohl(s->seq.upper, swap); 156 memcpy(&sub->evt.s, s, sizeof(*s)); !! 326 sub->timeout = htohl(s->timeout, swap); 157 sub->s.seq.type = tipc_sub_read(s, seq !! 327 sub->filter = htohl(s->filter, swap); 158 sub->s.seq.lower = lower; !! 328 if ((!(sub->filter & TIPC_SUB_PORTS) == 159 sub->s.seq.upper = upper; !! 329 !(sub->filter & TIPC_SUB_SERVICE)) || 160 sub->s.filter = filter; !! 330 (sub->seq.lower > sub->seq.upper)) { 161 sub->s.timeout = tipc_sub_read(s, time !! 331 pr_warn("Subscription rejected, illegal request\n"); 162 memcpy(sub->s.usr_handle, s->usr_handl << 163 spin_lock_init(&sub->lock); << 164 kref_init(&sub->kref); << 165 if (!tipc_nametbl_subscribe(sub)) { << 166 kfree(sub); 332 kfree(sub); >> 333 subscr_terminate(subscriber); 167 return NULL; 334 return NULL; 168 } 335 } 169 timer_setup(&sub->timer, tipc_sub_time !! 336 INIT_LIST_HEAD(&sub->nameseq_list); 170 timeout = tipc_sub_read(&sub->evt.s, t !! 337 list_add(&sub->subscription_list, &subscriber->subscription_list); 171 if (timeout != TIPC_WAIT_FOREVER) !! 338 sub->server_ref = subscriber->port_ref; 172 mod_timer(&sub->timer, jiffies !! 339 sub->swap = swap; >> 340 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); >> 341 atomic_inc(&topsrv.subscription_count); >> 342 if (sub->timeout != TIPC_WAIT_FOREVER) { >> 343 k_init_timer(&sub->timer, >> 344 (Handler)subscr_timeout, (unsigned long)sub); >> 345 k_start_timer(&sub->timer, sub->timeout); >> 346 } >> 347 173 return sub; 348 return sub; 174 } 349 } 175 350 176 void tipc_sub_unsubscribe(struct tipc_subscrip !! 351 /** >> 352 * subscr_conn_shutdown_event - handle termination request from subscriber >> 353 * >> 354 * Called with subscriber's server port unlocked. >> 355 */ >> 356 static void subscr_conn_shutdown_event(void *usr_handle, >> 357 u32 port_ref, >> 358 struct sk_buff **buf, >> 359 unsigned char const *data, >> 360 unsigned int size, >> 361 int reason) >> 362 { >> 363 struct tipc_subscriber *subscriber = usr_handle; >> 364 spinlock_t *subscriber_lock; >> 365 >> 366 if (tipc_port_lock(port_ref) == NULL) >> 367 return; >> 368 >> 369 subscriber_lock = subscriber->lock; >> 370 subscr_terminate(subscriber); >> 371 spin_unlock_bh(subscriber_lock); >> 372 } >> 373 >> 374 /** >> 375 * subscr_conn_msg_event - handle new subscription request from subscriber >> 376 * >> 377 * Called with subscriber's server port unlocked. >> 378 */ >> 379 static void subscr_conn_msg_event(void *usr_handle, >> 380 u32 port_ref, >> 381 struct sk_buff **buf, >> 382 const unchar *data, >> 383 u32 size) 177 { 384 { 178 tipc_nametbl_unsubscribe(sub); !! 385 struct tipc_subscriber *subscriber = usr_handle; 179 if (sub->evt.s.timeout != TIPC_WAIT_FO !! 386 spinlock_t *subscriber_lock; 180 del_timer_sync(&sub->timer); !! 387 struct tipc_subscription *sub; 181 list_del(&sub->sub_list); !! 388 182 tipc_sub_put(sub); !! 389 /* >> 390 * Lock subscriber's server port (& make a local copy of lock pointer, >> 391 * in case subscriber is deleted while processing subscription request) >> 392 */ >> 393 if (tipc_port_lock(port_ref) == NULL) >> 394 return; >> 395 >> 396 subscriber_lock = subscriber->lock; >> 397 >> 398 if (size != sizeof(struct tipc_subscr)) { >> 399 subscr_terminate(subscriber); >> 400 spin_unlock_bh(subscriber_lock); >> 401 } else { >> 402 sub = subscr_subscribe((struct tipc_subscr *)data, subscriber); >> 403 spin_unlock_bh(subscriber_lock); >> 404 if (sub != NULL) { >> 405 >> 406 /* >> 407 * We must release the server port lock before adding a >> 408 * subscription to the name table since TIPC needs to be >> 409 * able to (re)acquire the port lock if an event message >> 410 * issued by the subscription process is rejected and >> 411 * returned. The subscription cannot be deleted while >> 412 * it is being added to the name table because: >> 413 * a) the single-threading of the native API port code >> 414 * ensures the subscription cannot be cancelled and >> 415 * the subscriber connection cannot be broken, and >> 416 * b) the name table lock ensures the subscription >> 417 * timeout code cannot delete the subscription, >> 418 * so the subscription object is still protected. >> 419 */ >> 420 tipc_nametbl_subscribe(sub); >> 421 } >> 422 } >> 423 } >> 424 >> 425 /** >> 426 * subscr_named_msg_event - handle request to establish a new subscriber >> 427 */ >> 428 static void subscr_named_msg_event(void *usr_handle, >> 429 u32 port_ref, >> 430 struct sk_buff **buf, >> 431 const unchar *data, >> 432 u32 size, >> 433 u32 importance, >> 434 struct tipc_portid const *orig, >> 435 struct tipc_name_seq const *dest) >> 436 { >> 437 struct tipc_subscriber *subscriber; >> 438 u32 server_port_ref; >> 439 >> 440 /* Create subscriber object */ >> 441 subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC); >> 442 if (subscriber == NULL) { >> 443 pr_warn("Subscriber rejected, no memory\n"); >> 444 return; >> 445 } >> 446 INIT_LIST_HEAD(&subscriber->subscription_list); >> 447 INIT_LIST_HEAD(&subscriber->subscriber_list); >> 448 >> 449 /* Create server port & establish connection to subscriber */ >> 450 tipc_createport(subscriber, >> 451 importance, >> 452 NULL, >> 453 NULL, >> 454 subscr_conn_shutdown_event, >> 455 NULL, >> 456 NULL, >> 457 subscr_conn_msg_event, >> 458 NULL, >> 459 &subscriber->port_ref); >> 460 if (subscriber->port_ref == 0) { >> 461 pr_warn("Subscriber rejected, unable to create port\n"); >> 462 kfree(subscriber); >> 463 return; >> 464 } >> 465 tipc_connect(subscriber->port_ref, orig); >> 466 >> 467 /* Lock server port (& save lock address for future use) */ >> 468 subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock; >> 469 >> 470 /* Add subscriber to topology server's subscriber list */ >> 471 spin_lock_bh(&topsrv.lock); >> 472 list_add(&subscriber->subscriber_list, &topsrv.subscriber_list); >> 473 spin_unlock_bh(&topsrv.lock); >> 474 >> 475 /* Unlock server port */ >> 476 server_port_ref = subscriber->port_ref; >> 477 spin_unlock_bh(subscriber->lock); >> 478 >> 479 /* Send an ACK- to complete connection handshaking */ >> 480 tipc_send(server_port_ref, 0, NULL, 0); >> 481 >> 482 /* Handle optional subscription request */ >> 483 if (size != 0) { >> 484 subscr_conn_msg_event(subscriber, server_port_ref, >> 485 buf, data, size); >> 486 } >> 487 } >> 488 >> 489 int tipc_subscr_start(void) >> 490 { >> 491 struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV}; >> 492 int res; >> 493 >> 494 spin_lock_init(&topsrv.lock); >> 495 INIT_LIST_HEAD(&topsrv.subscriber_list); >> 496 >> 497 res = tipc_createport(NULL, >> 498 TIPC_CRITICAL_IMPORTANCE, >> 499 NULL, >> 500 NULL, >> 501 NULL, >> 502 NULL, >> 503 subscr_named_msg_event, >> 504 NULL, >> 505 NULL, >> 506 &topsrv.setup_port); >> 507 if (res) >> 508 goto failed; >> 509 >> 510 res = tipc_publish(topsrv.setup_port, TIPC_NODE_SCOPE, &seq); >> 511 if (res) { >> 512 tipc_deleteport(topsrv.setup_port); >> 513 topsrv.setup_port = 0; >> 514 goto failed; >> 515 } >> 516 >> 517 return 0; >> 518 >> 519 failed: >> 520 pr_err("Failed to create subscription service\n"); >> 521 return res; >> 522 } >> 523 >> 524 void tipc_subscr_stop(void) >> 525 { >> 526 struct tipc_subscriber *subscriber; >> 527 struct tipc_subscriber *subscriber_temp; >> 528 spinlock_t *subscriber_lock; >> 529 >> 530 if (topsrv.setup_port) { >> 531 tipc_deleteport(topsrv.setup_port); >> 532 topsrv.setup_port = 0; >> 533 >> 534 list_for_each_entry_safe(subscriber, subscriber_temp, >> 535 &topsrv.subscriber_list, >> 536 subscriber_list) { >> 537 subscriber_lock = subscriber->lock; >> 538 spin_lock_bh(subscriber_lock); >> 539 subscr_terminate(subscriber); >> 540 spin_unlock_bh(subscriber_lock); >> 541 } >> 542 } 183 } 543 } 184 544
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.