1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* Processing of received RxRPC packets 3 * 4 * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include "ar-internal.h" 11 12 /* Override priority when generating ACKs for received DATA */ 13 static const u8 rxrpc_ack_priority[RXRPC_ACK__INVALID] = { 14 [RXRPC_ACK_IDLE] = 1, 15 [RXRPC_ACK_DELAY] = 2, 16 [RXRPC_ACK_REQUESTED] = 3, 17 [RXRPC_ACK_DUPLICATE] = 4, 18 [RXRPC_ACK_EXCEEDS_WINDOW] = 5, 19 [RXRPC_ACK_NOSPACE] = 6, 20 [RXRPC_ACK_OUT_OF_SEQUENCE] = 7, 21 }; 22 23 static void rxrpc_proto_abort(struct rxrpc_call *call, rxrpc_seq_t seq, 24 enum rxrpc_abort_reason why) 25 { 26 rxrpc_abort_call(call, seq, RX_PROTOCOL_ERROR, -EBADMSG, why); 27 } 28 29 /* 30 * Do TCP-style congestion management [RFC 5681]. 31 */ 32 static void rxrpc_congestion_management(struct rxrpc_call *call, 33 struct sk_buff *skb, 34 struct rxrpc_ack_summary *summary, 35 rxrpc_serial_t acked_serial) 36 { 37 enum rxrpc_congest_change change = rxrpc_cong_no_change; 38 unsigned int cumulative_acks = call->cong_cumul_acks; 39 unsigned int cwnd = call->cong_cwnd; 40 bool resend = false; 41 42 summary->flight_size = 43 (call->tx_top - call->acks_hard_ack) - summary->nr_acks; 44 45 if (test_and_clear_bit(RXRPC_CALL_RETRANS_TIMEOUT, &call->flags)) { 46 summary->retrans_timeo = true; 47 call->cong_ssthresh = max_t(unsigned int, 48 summary->flight_size / 2, 2); 49 cwnd = 1; 50 if (cwnd >= call->cong_ssthresh && 51 call->cong_mode == RXRPC_CALL_SLOW_START) { 52 call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; 53 call->cong_tstamp = skb->tstamp; 54 cumulative_acks = 0; 55 } 56 } 57 58 cumulative_acks += summary->nr_new_acks; 59 if (cumulative_acks > 255) 60 cumulative_acks = 255; 61 62 summary->cwnd = call->cong_cwnd; 63 summary->ssthresh = call->cong_ssthresh; 64 summary->cumulative_acks = cumulative_acks; 65 summary->dup_acks = call->cong_dup_acks; 66 67 switch (call->cong_mode) { 68 case RXRPC_CALL_SLOW_START: 69 if (summary->saw_nacks) 70 goto packet_loss_detected; 71 if (summary->cumulative_acks > 0) 72 cwnd += 1; 73 if (cwnd >= call->cong_ssthresh) { 74 call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; 75 call->cong_tstamp = skb->tstamp; 76 } 77 goto out; 78 79 case RXRPC_CALL_CONGEST_AVOIDANCE: 80 if (summary->saw_nacks) 81 goto packet_loss_detected; 82 83 /* We analyse the number of packets that get ACK'd per RTT 84 * period and increase the window if we managed to fill it. 85 */ 86 if (call->peer->rtt_count == 0) 87 goto out; 88 if (ktime_before(skb->tstamp, 89 ktime_add_us(call->cong_tstamp, 90 call->peer->srtt_us >> 3))) 91 goto out_no_clear_ca; 92 change = rxrpc_cong_rtt_window_end; 93 call->cong_tstamp = skb->tstamp; 94 if (cumulative_acks >= cwnd) 95 cwnd++; 96 goto out; 97 98 case RXRPC_CALL_PACKET_LOSS: 99 if (!summary->saw_nacks) 100 goto resume_normality; 101 102 if (summary->new_low_nack) { 103 change = rxrpc_cong_new_low_nack; 104 call->cong_dup_acks = 1; 105 if (call->cong_extra > 1) 106 call->cong_extra = 1; 107 goto send_extra_data; 108 } 109 110 call->cong_dup_acks++; 111 if (call->cong_dup_acks < 3) 112 goto send_extra_data; 113 114 change = rxrpc_cong_begin_retransmission; 115 call->cong_mode = RXRPC_CALL_FAST_RETRANSMIT; 116 call->cong_ssthresh = max_t(unsigned int, 117 summary->flight_size / 2, 2); 118 cwnd = call->cong_ssthresh + 3; 119 call->cong_extra = 0; 120 call->cong_dup_acks = 0; 121 resend = true; 122 goto out; 123 124 case RXRPC_CALL_FAST_RETRANSMIT: 125 if (!summary->new_low_nack) { 126 if (summary->nr_new_acks == 0) 127 cwnd += 1; 128 call->cong_dup_acks++; 129 if (call->cong_dup_acks == 2) { 130 change = rxrpc_cong_retransmit_again; 131 call->cong_dup_acks = 0; 132 resend = true; 133 } 134 } else { 135 change = rxrpc_cong_progress; 136 cwnd = call->cong_ssthresh; 137 if (!summary->saw_nacks) 138 goto resume_normality; 139 } 140 goto out; 141 142 default: 143 BUG(); 144 goto out; 145 } 146 147 resume_normality: 148 change = rxrpc_cong_cleared_nacks; 149 call->cong_dup_acks = 0; 150 call->cong_extra = 0; 151 call->cong_tstamp = skb->tstamp; 152 if (cwnd < call->cong_ssthresh) 153 call->cong_mode = RXRPC_CALL_SLOW_START; 154 else 155 call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; 156 out: 157 cumulative_acks = 0; 158 out_no_clear_ca: 159 if (cwnd >= RXRPC_TX_MAX_WINDOW) 160 cwnd = RXRPC_TX_MAX_WINDOW; 161 call->cong_cwnd = cwnd; 162 call->cong_cumul_acks = cumulative_acks; 163 summary->mode = call->cong_mode; 164 trace_rxrpc_congest(call, summary, acked_serial, change); 165 if (resend) 166 rxrpc_resend(call, skb); 167 return; 168 169 packet_loss_detected: 170 change = rxrpc_cong_saw_nack; 171 call->cong_mode = RXRPC_CALL_PACKET_LOSS; 172 call->cong_dup_acks = 0; 173 goto send_extra_data; 174 175 send_extra_data: 176 /* Send some previously unsent DATA if we have some to advance the ACK 177 * state. 178 */ 179 if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) || 180 summary->nr_acks != call->tx_top - call->acks_hard_ack) { 181 call->cong_extra++; 182 wake_up(&call->waitq); 183 } 184 goto out_no_clear_ca; 185 } 186 187 /* 188 * Degrade the congestion window if we haven't transmitted a packet for >1RTT. 189 */ 190 void rxrpc_congestion_degrade(struct rxrpc_call *call) 191 { 192 ktime_t rtt, now; 193 194 if (call->cong_mode != RXRPC_CALL_SLOW_START && 195 call->cong_mode != RXRPC_CALL_CONGEST_AVOIDANCE) 196 return; 197 if (__rxrpc_call_state(call) == RXRPC_CALL_CLIENT_AWAIT_REPLY) 198 return; 199 200 rtt = ns_to_ktime(call->peer->srtt_us * (1000 / 8)); 201 now = ktime_get_real(); 202 if (!ktime_before(ktime_add(call->tx_last_sent, rtt), now)) 203 return; 204 205 trace_rxrpc_reset_cwnd(call, now); 206 rxrpc_inc_stat(call->rxnet, stat_tx_data_cwnd_reset); 207 call->tx_last_sent = now; 208 call->cong_mode = RXRPC_CALL_SLOW_START; 209 call->cong_ssthresh = max_t(unsigned int, call->cong_ssthresh, 210 call->cong_cwnd * 3 / 4); 211 call->cong_cwnd = max_t(unsigned int, call->cong_cwnd / 2, RXRPC_MIN_CWND); 212 } 213 214 /* 215 * Apply a hard ACK by advancing the Tx window. 216 */ 217 static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to, 218 struct rxrpc_ack_summary *summary) 219 { 220 struct rxrpc_txbuf *txb; 221 bool rot_last = false; 222 223 list_for_each_entry_rcu(txb, &call->tx_buffer, call_link, false) { 224 if (before_eq(txb->seq, call->acks_hard_ack)) 225 continue; 226 if (txb->flags & RXRPC_LAST_PACKET) { 227 set_bit(RXRPC_CALL_TX_LAST, &call->flags); 228 rot_last = true; 229 } 230 if (txb->seq == to) 231 break; 232 } 233 234 if (rot_last) 235 set_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags); 236 237 _enter("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last); 238 239 if (call->acks_lowest_nak == call->acks_hard_ack) { 240 call->acks_lowest_nak = to; 241 } else if (after(to, call->acks_lowest_nak)) { 242 summary->new_low_nack = true; 243 call->acks_lowest_nak = to; 244 } 245 246 smp_store_release(&call->acks_hard_ack, to); 247 248 trace_rxrpc_txqueue(call, (rot_last ? 249 rxrpc_txqueue_rotate_last : 250 rxrpc_txqueue_rotate)); 251 wake_up(&call->waitq); 252 return rot_last; 253 } 254 255 /* 256 * End the transmission phase of a call. 257 * 258 * This occurs when we get an ACKALL packet, the first DATA packet of a reply, 259 * or a final ACK packet. 260 */ 261 static void rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun, 262 enum rxrpc_abort_reason abort_why) 263 { 264 ASSERT(test_bit(RXRPC_CALL_TX_LAST, &call->flags)); 265 266 call->resend_at = KTIME_MAX; 267 trace_rxrpc_timer_can(call, rxrpc_timer_trace_resend); 268 269 if (unlikely(call->cong_last_nack)) { 270 rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack); 271 call->cong_last_nack = NULL; 272 } 273 274 switch (__rxrpc_call_state(call)) { 275 case RXRPC_CALL_CLIENT_SEND_REQUEST: 276 case RXRPC_CALL_CLIENT_AWAIT_REPLY: 277 if (reply_begun) { 278 rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_RECV_REPLY); 279 trace_rxrpc_txqueue(call, rxrpc_txqueue_end); 280 break; 281 } 282 283 rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_AWAIT_REPLY); 284 trace_rxrpc_txqueue(call, rxrpc_txqueue_await_reply); 285 break; 286 287 case RXRPC_CALL_SERVER_AWAIT_ACK: 288 rxrpc_call_completed(call); 289 trace_rxrpc_txqueue(call, rxrpc_txqueue_end); 290 break; 291 292 default: 293 kdebug("end_tx %s", rxrpc_call_states[__rxrpc_call_state(call)]); 294 rxrpc_proto_abort(call, call->tx_top, abort_why); 295 break; 296 } 297 } 298 299 /* 300 * Begin the reply reception phase of a call. 301 */ 302 static bool rxrpc_receiving_reply(struct rxrpc_call *call) 303 { 304 struct rxrpc_ack_summary summary = { 0 }; 305 rxrpc_seq_t top = READ_ONCE(call->tx_top); 306 307 if (call->ackr_reason) { 308 call->delay_ack_at = KTIME_MAX; 309 trace_rxrpc_timer_can(call, rxrpc_timer_trace_delayed_ack); 310 } 311 312 if (!test_bit(RXRPC_CALL_TX_LAST, &call->flags)) { 313 if (!rxrpc_rotate_tx_window(call, top, &summary)) { 314 rxrpc_proto_abort(call, top, rxrpc_eproto_early_reply); 315 return false; 316 } 317 } 318 319 rxrpc_end_tx_phase(call, true, rxrpc_eproto_unexpected_reply); 320 return true; 321 } 322 323 /* 324 * End the packet reception phase. 325 */ 326 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) 327 { 328 rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq); 329 330 _enter("%d,%s", call->debug_id, rxrpc_call_states[__rxrpc_call_state(call)]); 331 332 trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh); 333 334 switch (__rxrpc_call_state(call)) { 335 case RXRPC_CALL_CLIENT_RECV_REPLY: 336 rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack); 337 rxrpc_call_completed(call); 338 break; 339 340 case RXRPC_CALL_SERVER_RECV_REQUEST: 341 rxrpc_set_call_state(call, RXRPC_CALL_SERVER_ACK_REQUEST); 342 call->expect_req_by = KTIME_MAX; 343 rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_processing_op); 344 break; 345 346 default: 347 break; 348 } 349 } 350 351 static void rxrpc_input_update_ack_window(struct rxrpc_call *call, 352 rxrpc_seq_t window, rxrpc_seq_t wtop) 353 { 354 call->ackr_window = window; 355 call->ackr_wtop = wtop; 356 } 357 358 /* 359 * Push a DATA packet onto the Rx queue. 360 */ 361 static void rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb, 362 rxrpc_seq_t window, rxrpc_seq_t wtop, 363 enum rxrpc_receive_trace why) 364 { 365 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 366 bool last = sp->hdr.flags & RXRPC_LAST_PACKET; 367 368 __skb_queue_tail(&call->recvmsg_queue, skb); 369 rxrpc_input_update_ack_window(call, window, wtop); 370 trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq); 371 if (last) 372 rxrpc_end_rx_phase(call, sp->hdr.serial); 373 } 374 375 /* 376 * Process a DATA packet. 377 */ 378 static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb, 379 bool *_notify, rxrpc_serial_t *_ack_serial, int *_ack_reason) 380 { 381 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 382 struct sk_buff *oos; 383 rxrpc_serial_t serial = sp->hdr.serial; 384 unsigned int sack = call->ackr_sack_base; 385 rxrpc_seq_t window = call->ackr_window; 386 rxrpc_seq_t wtop = call->ackr_wtop; 387 rxrpc_seq_t wlimit = window + call->rx_winsize - 1; 388 rxrpc_seq_t seq = sp->hdr.seq; 389 bool last = sp->hdr.flags & RXRPC_LAST_PACKET; 390 int ack_reason = -1; 391 392 rxrpc_inc_stat(call->rxnet, stat_rx_data); 393 if (sp->hdr.flags & RXRPC_REQUEST_ACK) 394 rxrpc_inc_stat(call->rxnet, stat_rx_data_reqack); 395 if (sp->hdr.flags & RXRPC_JUMBO_PACKET) 396 rxrpc_inc_stat(call->rxnet, stat_rx_data_jumbo); 397 398 if (last) { 399 if (test_and_set_bit(RXRPC_CALL_RX_LAST, &call->flags) && 400 seq + 1 != wtop) 401 return rxrpc_proto_abort(call, seq, rxrpc_eproto_different_last); 402 } else { 403 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) && 404 after_eq(seq, wtop)) { 405 pr_warn("Packet beyond last: c=%x q=%x window=%x-%x wlimit=%x\n", 406 call->debug_id, seq, window, wtop, wlimit); 407 return rxrpc_proto_abort(call, seq, rxrpc_eproto_data_after_last); 408 } 409 } 410 411 if (after(seq, call->rx_highest_seq)) 412 call->rx_highest_seq = seq; 413 414 trace_rxrpc_rx_data(call->debug_id, seq, serial, sp->hdr.flags); 415 416 if (before(seq, window)) { 417 ack_reason = RXRPC_ACK_DUPLICATE; 418 goto send_ack; 419 } 420 if (after(seq, wlimit)) { 421 ack_reason = RXRPC_ACK_EXCEEDS_WINDOW; 422 goto send_ack; 423 } 424 425 /* Queue the packet. */ 426 if (seq == window) { 427 if (sp->hdr.flags & RXRPC_REQUEST_ACK) 428 ack_reason = RXRPC_ACK_REQUESTED; 429 /* Send an immediate ACK if we fill in a hole */ 430 else if (!skb_queue_empty(&call->rx_oos_queue)) 431 ack_reason = RXRPC_ACK_DELAY; 432 433 window++; 434 if (after(window, wtop)) { 435 trace_rxrpc_sack(call, seq, sack, rxrpc_sack_none); 436 wtop = window; 437 } else { 438 trace_rxrpc_sack(call, seq, sack, rxrpc_sack_advance); 439 sack = (sack + 1) % RXRPC_SACK_SIZE; 440 } 441 442 443 rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg); 444 445 spin_lock(&call->recvmsg_queue.lock); 446 rxrpc_input_queue_data(call, skb, window, wtop, rxrpc_receive_queue); 447 *_notify = true; 448 449 while ((oos = skb_peek(&call->rx_oos_queue))) { 450 struct rxrpc_skb_priv *osp = rxrpc_skb(oos); 451 452 if (after(osp->hdr.seq, window)) 453 break; 454 455 __skb_unlink(oos, &call->rx_oos_queue); 456 last = osp->hdr.flags & RXRPC_LAST_PACKET; 457 seq = osp->hdr.seq; 458 call->ackr_sack_table[sack] = 0; 459 trace_rxrpc_sack(call, seq, sack, rxrpc_sack_fill); 460 sack = (sack + 1) % RXRPC_SACK_SIZE; 461 462 window++; 463 rxrpc_input_queue_data(call, oos, window, wtop, 464 rxrpc_receive_queue_oos); 465 } 466 467 spin_unlock(&call->recvmsg_queue.lock); 468 469 call->ackr_sack_base = sack; 470 } else { 471 unsigned int slot; 472 473 ack_reason = RXRPC_ACK_OUT_OF_SEQUENCE; 474 475 slot = seq - window; 476 sack = (sack + slot) % RXRPC_SACK_SIZE; 477 478 if (call->ackr_sack_table[sack % RXRPC_SACK_SIZE]) { 479 ack_reason = RXRPC_ACK_DUPLICATE; 480 goto send_ack; 481 } 482 483 call->ackr_sack_table[sack % RXRPC_SACK_SIZE] |= 1; 484 trace_rxrpc_sack(call, seq, sack, rxrpc_sack_oos); 485 486 if (after(seq + 1, wtop)) { 487 wtop = seq + 1; 488 rxrpc_input_update_ack_window(call, window, wtop); 489 } 490 491 skb_queue_walk(&call->rx_oos_queue, oos) { 492 struct rxrpc_skb_priv *osp = rxrpc_skb(oos); 493 494 if (after(osp->hdr.seq, seq)) { 495 rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg_oos); 496 __skb_queue_before(&call->rx_oos_queue, oos, skb); 497 goto oos_queued; 498 } 499 } 500 501 rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg_oos); 502 __skb_queue_tail(&call->rx_oos_queue, skb); 503 oos_queued: 504 trace_rxrpc_receive(call, last ? rxrpc_receive_oos_last : rxrpc_receive_oos, 505 sp->hdr.serial, sp->hdr.seq); 506 } 507 508 send_ack: 509 if (ack_reason >= 0) { 510 if (rxrpc_ack_priority[ack_reason] > rxrpc_ack_priority[*_ack_reason]) { 511 *_ack_serial = serial; 512 *_ack_reason = ack_reason; 513 } else if (rxrpc_ack_priority[ack_reason] == rxrpc_ack_priority[*_ack_reason] && 514 ack_reason == RXRPC_ACK_REQUESTED) { 515 *_ack_serial = serial; 516 *_ack_reason = ack_reason; 517 } 518 } 519 } 520 521 /* 522 * Split a jumbo packet and file the bits separately. 523 */ 524 static bool rxrpc_input_split_jumbo(struct rxrpc_call *call, struct sk_buff *skb) 525 { 526 struct rxrpc_jumbo_header jhdr; 527 struct rxrpc_skb_priv *sp = rxrpc_skb(skb), *jsp; 528 struct sk_buff *jskb; 529 rxrpc_serial_t ack_serial = 0; 530 unsigned int offset = sizeof(struct rxrpc_wire_header); 531 unsigned int len = skb->len - offset; 532 bool notify = false; 533 int ack_reason = 0; 534 535 while (sp->hdr.flags & RXRPC_JUMBO_PACKET) { 536 if (len < RXRPC_JUMBO_SUBPKTLEN) 537 goto protocol_error; 538 if (sp->hdr.flags & RXRPC_LAST_PACKET) 539 goto protocol_error; 540 if (skb_copy_bits(skb, offset + RXRPC_JUMBO_DATALEN, 541 &jhdr, sizeof(jhdr)) < 0) 542 goto protocol_error; 543 544 jskb = skb_clone(skb, GFP_NOFS); 545 if (!jskb) { 546 kdebug("couldn't clone"); 547 return false; 548 } 549 rxrpc_new_skb(jskb, rxrpc_skb_new_jumbo_subpacket); 550 jsp = rxrpc_skb(jskb); 551 jsp->offset = offset; 552 jsp->len = RXRPC_JUMBO_DATALEN; 553 rxrpc_input_data_one(call, jskb, ¬ify, &ack_serial, &ack_reason); 554 rxrpc_free_skb(jskb, rxrpc_skb_put_jumbo_subpacket); 555 556 sp->hdr.flags = jhdr.flags; 557 sp->hdr._rsvd = ntohs(jhdr._rsvd); 558 sp->hdr.seq++; 559 sp->hdr.serial++; 560 offset += RXRPC_JUMBO_SUBPKTLEN; 561 len -= RXRPC_JUMBO_SUBPKTLEN; 562 } 563 564 sp->offset = offset; 565 sp->len = len; 566 rxrpc_input_data_one(call, skb, ¬ify, &ack_serial, &ack_reason); 567 568 if (ack_reason > 0) { 569 rxrpc_send_ACK(call, ack_reason, ack_serial, 570 rxrpc_propose_ack_input_data); 571 } else { 572 call->ackr_nr_unacked++; 573 rxrpc_propose_delay_ACK(call, sp->hdr.serial, 574 rxrpc_propose_ack_input_data); 575 } 576 if (notify) { 577 trace_rxrpc_notify_socket(call->debug_id, sp->hdr.serial); 578 rxrpc_notify_socket(call); 579 } 580 return true; 581 582 protocol_error: 583 return false; 584 } 585 586 /* 587 * Process a DATA packet, adding the packet to the Rx ring. The caller's 588 * packet ref must be passed on or discarded. 589 */ 590 static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) 591 { 592 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 593 rxrpc_serial_t serial = sp->hdr.serial; 594 rxrpc_seq_t seq0 = sp->hdr.seq; 595 596 _enter("{%x,%x,%x},{%u,%x}", 597 call->ackr_window, call->ackr_wtop, call->rx_highest_seq, 598 skb->len, seq0); 599 600 if (__rxrpc_call_is_complete(call)) 601 return; 602 603 switch (__rxrpc_call_state(call)) { 604 case RXRPC_CALL_CLIENT_SEND_REQUEST: 605 case RXRPC_CALL_CLIENT_AWAIT_REPLY: 606 /* Received data implicitly ACKs all of the request 607 * packets we sent when we're acting as a client. 608 */ 609 if (!rxrpc_receiving_reply(call)) 610 goto out_notify; 611 break; 612 613 case RXRPC_CALL_SERVER_RECV_REQUEST: { 614 unsigned long timo = READ_ONCE(call->next_req_timo); 615 616 if (timo) { 617 ktime_t delay = ms_to_ktime(timo); 618 619 call->expect_req_by = ktime_add(ktime_get_real(), delay); 620 trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_idle); 621 } 622 break; 623 } 624 625 default: 626 break; 627 } 628 629 if (!rxrpc_input_split_jumbo(call, skb)) { 630 rxrpc_proto_abort(call, sp->hdr.seq, rxrpc_badmsg_bad_jumbo); 631 goto out_notify; 632 } 633 return; 634 635 out_notify: 636 trace_rxrpc_notify_socket(call->debug_id, serial); 637 rxrpc_notify_socket(call); 638 _leave(" [queued]"); 639 } 640 641 /* 642 * See if there's a cached RTT probe to complete. 643 */ 644 static void rxrpc_complete_rtt_probe(struct rxrpc_call *call, 645 ktime_t resp_time, 646 rxrpc_serial_t acked_serial, 647 rxrpc_serial_t ack_serial, 648 enum rxrpc_rtt_rx_trace type) 649 { 650 rxrpc_serial_t orig_serial; 651 unsigned long avail; 652 ktime_t sent_at; 653 bool matched = false; 654 int i; 655 656 avail = READ_ONCE(call->rtt_avail); 657 smp_rmb(); /* Read avail bits before accessing data. */ 658 659 for (i = 0; i < ARRAY_SIZE(call->rtt_serial); i++) { 660 if (!test_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &avail)) 661 continue; 662 663 sent_at = call->rtt_sent_at[i]; 664 orig_serial = call->rtt_serial[i]; 665 666 if (orig_serial == acked_serial) { 667 clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail); 668 smp_mb(); /* Read data before setting avail bit */ 669 set_bit(i, &call->rtt_avail); 670 rxrpc_peer_add_rtt(call, type, i, acked_serial, ack_serial, 671 sent_at, resp_time); 672 matched = true; 673 } 674 675 /* If a later serial is being acked, then mark this slot as 676 * being available. 677 */ 678 if (after(acked_serial, orig_serial)) { 679 trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_obsolete, i, 680 orig_serial, acked_serial, 0, 0); 681 clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail); 682 smp_wmb(); 683 set_bit(i, &call->rtt_avail); 684 } 685 } 686 687 if (!matched) 688 trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_lost, 9, 0, acked_serial, 0, 0); 689 } 690 691 /* 692 * Process the extra information that may be appended to an ACK packet 693 */ 694 static void rxrpc_input_ack_trailer(struct rxrpc_call *call, struct sk_buff *skb, 695 struct rxrpc_acktrailer *trailer) 696 { 697 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 698 struct rxrpc_peer *peer; 699 unsigned int mtu; 700 bool wake = false; 701 u32 rwind = ntohl(trailer->rwind); 702 703 if (rwind > RXRPC_TX_MAX_WINDOW) 704 rwind = RXRPC_TX_MAX_WINDOW; 705 if (call->tx_winsize != rwind) { 706 if (rwind > call->tx_winsize) 707 wake = true; 708 trace_rxrpc_rx_rwind_change(call, sp->hdr.serial, rwind, wake); 709 call->tx_winsize = rwind; 710 } 711 712 mtu = min(ntohl(trailer->maxMTU), ntohl(trailer->ifMTU)); 713 714 peer = call->peer; 715 if (mtu < peer->maxdata) { 716 spin_lock(&peer->lock); 717 peer->maxdata = mtu; 718 peer->mtu = mtu + peer->hdrsize; 719 spin_unlock(&peer->lock); 720 } 721 722 if (wake) 723 wake_up(&call->waitq); 724 } 725 726 /* 727 * Determine how many nacks from the previous ACK have now been satisfied. 728 */ 729 static rxrpc_seq_t rxrpc_input_check_prev_ack(struct rxrpc_call *call, 730 struct rxrpc_ack_summary *summary, 731 rxrpc_seq_t seq) 732 { 733 struct sk_buff *skb = call->cong_last_nack; 734 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 735 unsigned int i, new_acks = 0, retained_nacks = 0; 736 rxrpc_seq_t old_seq = sp->ack.first_ack; 737 u8 *acks = skb->data + sizeof(struct rxrpc_wire_header) + sizeof(struct rxrpc_ackpacket); 738 739 if (after_eq(seq, old_seq + sp->ack.nr_acks)) { 740 summary->nr_new_acks += sp->ack.nr_nacks; 741 summary->nr_new_acks += seq - (old_seq + sp->ack.nr_acks); 742 summary->nr_retained_nacks = 0; 743 } else if (seq == old_seq) { 744 summary->nr_retained_nacks = sp->ack.nr_nacks; 745 } else { 746 for (i = 0; i < sp->ack.nr_acks; i++) { 747 if (acks[i] == RXRPC_ACK_TYPE_NACK) { 748 if (before(old_seq + i, seq)) 749 new_acks++; 750 else 751 retained_nacks++; 752 } 753 } 754 755 summary->nr_new_acks += new_acks; 756 summary->nr_retained_nacks = retained_nacks; 757 } 758 759 return old_seq + sp->ack.nr_acks; 760 } 761 762 /* 763 * Process individual soft ACKs. 764 * 765 * Each ACK in the array corresponds to one packet and can be either an ACK or 766 * a NAK. If we get find an explicitly NAK'd packet we resend immediately; 767 * packets that lie beyond the end of the ACK list are scheduled for resend by 768 * the timer on the basis that the peer might just not have processed them at 769 * the time the ACK was sent. 770 */ 771 static void rxrpc_input_soft_acks(struct rxrpc_call *call, 772 struct rxrpc_ack_summary *summary, 773 struct sk_buff *skb, 774 rxrpc_seq_t seq, 775 rxrpc_seq_t since) 776 { 777 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 778 unsigned int i, old_nacks = 0; 779 rxrpc_seq_t lowest_nak = seq + sp->ack.nr_acks; 780 u8 *acks = skb->data + sizeof(struct rxrpc_wire_header) + sizeof(struct rxrpc_ackpacket); 781 782 for (i = 0; i < sp->ack.nr_acks; i++) { 783 if (acks[i] == RXRPC_ACK_TYPE_ACK) { 784 summary->nr_acks++; 785 if (after_eq(seq, since)) 786 summary->nr_new_acks++; 787 } else { 788 summary->saw_nacks = true; 789 if (before(seq, since)) { 790 /* Overlap with previous ACK */ 791 old_nacks++; 792 } else { 793 summary->nr_new_nacks++; 794 sp->ack.nr_nacks++; 795 } 796 797 if (before(seq, lowest_nak)) 798 lowest_nak = seq; 799 } 800 seq++; 801 } 802 803 if (lowest_nak != call->acks_lowest_nak) { 804 call->acks_lowest_nak = lowest_nak; 805 summary->new_low_nack = true; 806 } 807 808 /* We *can* have more nacks than we did - the peer is permitted to drop 809 * packets it has soft-acked and re-request them. Further, it is 810 * possible for the nack distribution to change whilst the number of 811 * nacks stays the same or goes down. 812 */ 813 if (old_nacks < summary->nr_retained_nacks) 814 summary->nr_new_acks += summary->nr_retained_nacks - old_nacks; 815 summary->nr_retained_nacks = old_nacks; 816 } 817 818 /* 819 * Return true if the ACK is valid - ie. it doesn't appear to have regressed 820 * with respect to the ack state conveyed by preceding ACKs. 821 */ 822 static bool rxrpc_is_ack_valid(struct rxrpc_call *call, 823 rxrpc_seq_t first_pkt, rxrpc_seq_t prev_pkt) 824 { 825 rxrpc_seq_t base = READ_ONCE(call->acks_first_seq); 826 827 if (after(first_pkt, base)) 828 return true; /* The window advanced */ 829 830 if (before(first_pkt, base)) 831 return false; /* firstPacket regressed */ 832 833 if (after_eq(prev_pkt, call->acks_prev_seq)) 834 return true; /* previousPacket hasn't regressed. */ 835 836 /* Some rx implementations put a serial number in previousPacket. */ 837 if (after_eq(prev_pkt, base + call->tx_winsize)) 838 return false; 839 return true; 840 } 841 842 /* 843 * Process an ACK packet. 844 * 845 * ack.firstPacket is the sequence number of the first soft-ACK'd/NAK'd packet 846 * in the ACK array. Anything before that is hard-ACK'd and may be discarded. 847 * 848 * A hard-ACK means that a packet has been processed and may be discarded; a 849 * soft-ACK means that the packet may be discarded and retransmission 850 * requested. A phase is complete when all packets are hard-ACK'd. 851 */ 852 static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb) 853 { 854 struct rxrpc_ack_summary summary = { 0 }; 855 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 856 struct rxrpc_acktrailer trailer; 857 rxrpc_serial_t ack_serial, acked_serial; 858 rxrpc_seq_t first_soft_ack, hard_ack, prev_pkt, since; 859 int nr_acks, offset, ioffset; 860 861 _enter(""); 862 863 offset = sizeof(struct rxrpc_wire_header) + sizeof(struct rxrpc_ackpacket); 864 865 ack_serial = sp->hdr.serial; 866 acked_serial = sp->ack.acked_serial; 867 first_soft_ack = sp->ack.first_ack; 868 prev_pkt = sp->ack.prev_ack; 869 nr_acks = sp->ack.nr_acks; 870 hard_ack = first_soft_ack - 1; 871 summary.ack_reason = (sp->ack.reason < RXRPC_ACK__INVALID ? 872 sp->ack.reason : RXRPC_ACK__INVALID); 873 874 trace_rxrpc_rx_ack(call, ack_serial, acked_serial, 875 first_soft_ack, prev_pkt, 876 summary.ack_reason, nr_acks); 877 rxrpc_inc_stat(call->rxnet, stat_rx_acks[summary.ack_reason]); 878 879 if (acked_serial != 0) { 880 switch (summary.ack_reason) { 881 case RXRPC_ACK_PING_RESPONSE: 882 rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial, 883 rxrpc_rtt_rx_ping_response); 884 break; 885 case RXRPC_ACK_REQUESTED: 886 rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial, 887 rxrpc_rtt_rx_requested_ack); 888 break; 889 default: 890 rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial, 891 rxrpc_rtt_rx_other_ack); 892 break; 893 } 894 } 895 896 /* If we get an EXCEEDS_WINDOW ACK from the server, it probably 897 * indicates that the client address changed due to NAT. The server 898 * lost the call because it switched to a different peer. 899 */ 900 if (unlikely(summary.ack_reason == RXRPC_ACK_EXCEEDS_WINDOW) && 901 first_soft_ack == 1 && 902 prev_pkt == 0 && 903 rxrpc_is_client_call(call)) { 904 rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, 905 0, -ENETRESET); 906 goto send_response; 907 } 908 909 /* If we get an OUT_OF_SEQUENCE ACK from the server, that can also 910 * indicate a change of address. However, we can retransmit the call 911 * if we still have it buffered to the beginning. 912 */ 913 if (unlikely(summary.ack_reason == RXRPC_ACK_OUT_OF_SEQUENCE) && 914 first_soft_ack == 1 && 915 prev_pkt == 0 && 916 call->acks_hard_ack == 0 && 917 rxrpc_is_client_call(call)) { 918 rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, 919 0, -ENETRESET); 920 goto send_response; 921 } 922 923 /* Discard any out-of-order or duplicate ACKs (outside lock). */ 924 if (!rxrpc_is_ack_valid(call, first_soft_ack, prev_pkt)) { 925 trace_rxrpc_rx_discard_ack(call->debug_id, ack_serial, 926 first_soft_ack, call->acks_first_seq, 927 prev_pkt, call->acks_prev_seq); 928 goto send_response; 929 } 930 931 trailer.maxMTU = 0; 932 ioffset = offset + nr_acks + 3; 933 if (skb->len >= ioffset + sizeof(trailer) && 934 skb_copy_bits(skb, ioffset, &trailer, sizeof(trailer)) < 0) 935 return rxrpc_proto_abort(call, 0, rxrpc_badmsg_short_ack_trailer); 936 937 if (nr_acks > 0) 938 skb_condense(skb); 939 940 if (call->cong_last_nack) { 941 since = rxrpc_input_check_prev_ack(call, &summary, first_soft_ack); 942 rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack); 943 call->cong_last_nack = NULL; 944 } else { 945 summary.nr_new_acks = first_soft_ack - call->acks_first_seq; 946 call->acks_lowest_nak = first_soft_ack + nr_acks; 947 since = first_soft_ack; 948 } 949 950 call->acks_latest_ts = skb->tstamp; 951 call->acks_first_seq = first_soft_ack; 952 call->acks_prev_seq = prev_pkt; 953 954 switch (summary.ack_reason) { 955 case RXRPC_ACK_PING: 956 break; 957 default: 958 if (acked_serial && after(acked_serial, call->acks_highest_serial)) 959 call->acks_highest_serial = acked_serial; 960 break; 961 } 962 963 /* Parse rwind and mtu sizes if provided. */ 964 if (trailer.maxMTU) 965 rxrpc_input_ack_trailer(call, skb, &trailer); 966 967 if (first_soft_ack == 0) 968 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_zero); 969 970 /* Ignore ACKs unless we are or have just been transmitting. */ 971 switch (__rxrpc_call_state(call)) { 972 case RXRPC_CALL_CLIENT_SEND_REQUEST: 973 case RXRPC_CALL_CLIENT_AWAIT_REPLY: 974 case RXRPC_CALL_SERVER_SEND_REPLY: 975 case RXRPC_CALL_SERVER_AWAIT_ACK: 976 break; 977 default: 978 goto send_response; 979 } 980 981 if (before(hard_ack, call->acks_hard_ack) || 982 after(hard_ack, call->tx_top)) 983 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_outside_window); 984 if (nr_acks > call->tx_top - hard_ack) 985 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_sack_overflow); 986 987 if (after(hard_ack, call->acks_hard_ack)) { 988 if (rxrpc_rotate_tx_window(call, hard_ack, &summary)) { 989 rxrpc_end_tx_phase(call, false, rxrpc_eproto_unexpected_ack); 990 goto send_response; 991 } 992 } 993 994 if (nr_acks > 0) { 995 if (offset > (int)skb->len - nr_acks) 996 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_short_sack); 997 rxrpc_input_soft_acks(call, &summary, skb, first_soft_ack, since); 998 rxrpc_get_skb(skb, rxrpc_skb_get_last_nack); 999 call->cong_last_nack = skb; 1000 } 1001 1002 if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) && 1003 summary.nr_acks == call->tx_top - hard_ack && 1004 rxrpc_is_client_call(call)) 1005 rxrpc_propose_ping(call, ack_serial, 1006 rxrpc_propose_ack_ping_for_lost_reply); 1007 1008 rxrpc_congestion_management(call, skb, &summary, acked_serial); 1009 1010 send_response: 1011 if (summary.ack_reason == RXRPC_ACK_PING) 1012 rxrpc_send_ACK(call, RXRPC_ACK_PING_RESPONSE, ack_serial, 1013 rxrpc_propose_ack_respond_to_ping); 1014 else if (sp->hdr.flags & RXRPC_REQUEST_ACK) 1015 rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, ack_serial, 1016 rxrpc_propose_ack_respond_to_ack); 1017 } 1018 1019 /* 1020 * Process an ACKALL packet. 1021 */ 1022 static void rxrpc_input_ackall(struct rxrpc_call *call, struct sk_buff *skb) 1023 { 1024 struct rxrpc_ack_summary summary = { 0 }; 1025 1026 if (rxrpc_rotate_tx_window(call, call->tx_top, &summary)) 1027 rxrpc_end_tx_phase(call, false, rxrpc_eproto_unexpected_ackall); 1028 } 1029 1030 /* 1031 * Process an ABORT packet directed at a call. 1032 */ 1033 static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb) 1034 { 1035 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 1036 1037 trace_rxrpc_rx_abort(call, sp->hdr.serial, skb->priority); 1038 1039 rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, 1040 skb->priority, -ECONNABORTED); 1041 } 1042 1043 /* 1044 * Process an incoming call packet. 1045 */ 1046 void rxrpc_input_call_packet(struct rxrpc_call *call, struct sk_buff *skb) 1047 { 1048 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 1049 unsigned long timo; 1050 1051 _enter("%p,%p", call, skb); 1052 1053 if (sp->hdr.serviceId != call->dest_srx.srx_service) 1054 call->dest_srx.srx_service = sp->hdr.serviceId; 1055 if ((int)sp->hdr.serial - (int)call->rx_serial > 0) 1056 call->rx_serial = sp->hdr.serial; 1057 if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags)) 1058 set_bit(RXRPC_CALL_RX_HEARD, &call->flags); 1059 1060 timo = READ_ONCE(call->next_rx_timo); 1061 if (timo) { 1062 ktime_t delay = ms_to_ktime(timo); 1063 1064 call->expect_rx_by = ktime_add(ktime_get_real(), delay); 1065 trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_expect_rx); 1066 } 1067 1068 switch (sp->hdr.type) { 1069 case RXRPC_PACKET_TYPE_DATA: 1070 return rxrpc_input_data(call, skb); 1071 1072 case RXRPC_PACKET_TYPE_ACK: 1073 return rxrpc_input_ack(call, skb); 1074 1075 case RXRPC_PACKET_TYPE_BUSY: 1076 /* Just ignore BUSY packets from the server; the retry and 1077 * lifespan timers will take care of business. BUSY packets 1078 * from the client don't make sense. 1079 */ 1080 return; 1081 1082 case RXRPC_PACKET_TYPE_ABORT: 1083 return rxrpc_input_abort(call, skb); 1084 1085 case RXRPC_PACKET_TYPE_ACKALL: 1086 return rxrpc_input_ackall(call, skb); 1087 1088 default: 1089 break; 1090 } 1091 } 1092 1093 /* 1094 * Handle a new service call on a channel implicitly completing the preceding 1095 * call on that channel. This does not apply to client conns. 1096 * 1097 * TODO: If callNumber > call_id + 1, renegotiate security. 1098 */ 1099 void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb) 1100 { 1101 switch (__rxrpc_call_state(call)) { 1102 case RXRPC_CALL_SERVER_AWAIT_ACK: 1103 rxrpc_call_completed(call); 1104 fallthrough; 1105 case RXRPC_CALL_COMPLETE: 1106 break; 1107 default: 1108 rxrpc_abort_call(call, 0, RX_CALL_DEAD, -ESHUTDOWN, 1109 rxrpc_eproto_improper_term); 1110 trace_rxrpc_improper_term(call); 1111 break; 1112 } 1113 1114 rxrpc_input_call_event(call, skb); 1115 } 1116
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.