Merge branch 'mptcp-rx-path-refactor'

Matthieu Baerts says:

====================
mptcp: rx path refactor

Paolo worked on this RX path refactor for these two main reasons:

- Currently, the MPTCP RX path introduces quite a bit of 'exceptional'
  accounting/locking processing WRT to plain TCP, adding up to the
  implementation complexity in a miserable way.

- The performance gap WRT plain TCP for single subflow connections is
  quite measurable.

The present refactor addresses both the above items: most of the
additional complexity is dropped, and single stream performances
increase measurably, from 55Gbps to 71Gbps in Paolo's loopback test.
As a reference, plain TCP was around 84Gbps on the same host.

The above comes to a price: the patch are invasive, even in subtle ways.

Note: patch 5/7 removes the sk_forward_alloc_get() helper, which caused
some trivial modifications in different places in the net tree: sockets,
IPv4, sched. That's why a few more people have been Cc here. Feel free
to only look at this patch 5/7.
====================

Link: https://patch.msgid.link/20250218-net-next-mptcp-rx-path-refactor-v1-0-4a47d90d7998@kernel.org
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
This commit is contained in:
Jakub Kicinski
2025-02-19 19:05:30 -08:00
9 changed files with 134 additions and 289 deletions
-13
View File
@@ -1285,10 +1285,6 @@ struct proto {
unsigned int inuse_idx;
#endif
#if IS_ENABLED(CONFIG_MPTCP)
int (*forward_alloc_get)(const struct sock *sk);
#endif
bool (*stream_memory_free)(const struct sock *sk, int wake);
bool (*sock_is_readable)(struct sock *sk);
/* Memory pressure */
@@ -1349,15 +1345,6 @@ int sock_load_diag_module(int family, int protocol);
INDIRECT_CALLABLE_DECLARE(bool tcp_stream_memory_free(const struct sock *sk, int wake));
static inline int sk_forward_alloc_get(const struct sock *sk)
{
#if IS_ENABLED(CONFIG_MPTCP)
if (sk->sk_prot->forward_alloc_get)
return sk->sk_prot->forward_alloc_get(sk);
#endif
return READ_ONCE(sk->sk_forward_alloc);
}
static inline bool __sk_stream_memory_free(const struct sock *sk, int wake)
{
if (READ_ONCE(sk->sk_wmem_queued) >= READ_ONCE(sk->sk_sndbuf))
+1 -1
View File
@@ -3882,7 +3882,7 @@ void sk_get_meminfo(const struct sock *sk, u32 *mem)
mem[SK_MEMINFO_RCVBUF] = READ_ONCE(sk->sk_rcvbuf);
mem[SK_MEMINFO_WMEM_ALLOC] = sk_wmem_alloc_get(sk);
mem[SK_MEMINFO_SNDBUF] = READ_ONCE(sk->sk_sndbuf);
mem[SK_MEMINFO_FWD_ALLOC] = sk_forward_alloc_get(sk);
mem[SK_MEMINFO_FWD_ALLOC] = READ_ONCE(sk->sk_forward_alloc);
mem[SK_MEMINFO_WMEM_QUEUED] = READ_ONCE(sk->sk_wmem_queued);
mem[SK_MEMINFO_OPTMEM] = atomic_read(&sk->sk_omem_alloc);
mem[SK_MEMINFO_BACKLOG] = READ_ONCE(sk->sk_backlog.len);
+1 -1
View File
@@ -153,7 +153,7 @@ void inet_sock_destruct(struct sock *sk)
WARN_ON_ONCE(atomic_read(&sk->sk_rmem_alloc));
WARN_ON_ONCE(refcount_read(&sk->sk_wmem_alloc));
WARN_ON_ONCE(sk->sk_wmem_queued);
WARN_ON_ONCE(sk_forward_alloc_get(sk));
WARN_ON_ONCE(sk->sk_forward_alloc);
kfree(rcu_dereference_protected(inet->inet_opt, 1));
dst_release(rcu_dereference_protected(sk->sk_dst_cache, 1));
+1 -1
View File
@@ -282,7 +282,7 @@ int inet_sk_diag_fill(struct sock *sk, struct inet_connection_sock *icsk,
struct inet_diag_meminfo minfo = {
.idiag_rmem = sk_rmem_alloc_get(sk),
.idiag_wmem = READ_ONCE(sk->sk_wmem_queued),
.idiag_fmem = sk_forward_alloc_get(sk),
.idiag_fmem = READ_ONCE(sk->sk_forward_alloc),
.idiag_tmem = sk_wmem_alloc_get(sk),
};
+4 -23
View File
@@ -40,17 +40,17 @@ void mptcp_fastopen_subflow_synack_set_params(struct mptcp_subflow_context *subf
tp->copied_seq += skb->len;
subflow->ssn_offset += skb->len;
/* initialize a dummy sequence number, we will update it at MPC
* completion, if needed
*/
/* Only the sequence delta is relevant */
MPTCP_SKB_CB(skb)->map_seq = -skb->len;
MPTCP_SKB_CB(skb)->end_seq = 0;
MPTCP_SKB_CB(skb)->offset = 0;
MPTCP_SKB_CB(skb)->has_rxtstamp = TCP_SKB_CB(skb)->has_rxtstamp;
MPTCP_SKB_CB(skb)->cant_coalesce = 1;
mptcp_data_lock(sk);
DEBUG_NET_WARN_ON_ONCE(sock_owned_by_user_nocheck(sk));
mptcp_set_owner_r(skb, sk);
skb_set_owner_r(skb, sk);
__skb_queue_tail(&sk->sk_receive_queue, skb);
mptcp_sk(sk)->bytes_received += skb->len;
@@ -58,22 +58,3 @@ void mptcp_fastopen_subflow_synack_set_params(struct mptcp_subflow_context *subf
mptcp_data_unlock(sk);
}
void __mptcp_fastopen_gen_msk_ackseq(struct mptcp_sock *msk, struct mptcp_subflow_context *subflow,
const struct mptcp_options_received *mp_opt)
{
struct sock *sk = (struct sock *)msk;
struct sk_buff *skb;
skb = skb_peek_tail(&sk->sk_receive_queue);
if (skb) {
WARN_ON_ONCE(MPTCP_SKB_CB(skb)->end_seq);
pr_debug("msk %p moving seq %llx -> %llx end_seq %llx -> %llx\n", sk,
MPTCP_SKB_CB(skb)->map_seq, MPTCP_SKB_CB(skb)->map_seq + msk->ack_seq,
MPTCP_SKB_CB(skb)->end_seq, MPTCP_SKB_CB(skb)->end_seq + msk->ack_seq);
MPTCP_SKB_CB(skb)->map_seq += msk->ack_seq;
MPTCP_SKB_CB(skb)->end_seq += msk->ack_seq;
}
pr_debug("msk=%p ack_seq=%llx\n", msk, msk->ack_seq);
}
+101 -216
View File
@@ -118,24 +118,14 @@ static void mptcp_drop(struct sock *sk, struct sk_buff *skb)
__kfree_skb(skb);
}
static void mptcp_rmem_fwd_alloc_add(struct sock *sk, int size)
{
WRITE_ONCE(mptcp_sk(sk)->rmem_fwd_alloc,
mptcp_sk(sk)->rmem_fwd_alloc + size);
}
static void mptcp_rmem_charge(struct sock *sk, int size)
{
mptcp_rmem_fwd_alloc_add(sk, -size);
}
static bool mptcp_try_coalesce(struct sock *sk, struct sk_buff *to,
struct sk_buff *from)
{
bool fragstolen;
int delta;
if (MPTCP_SKB_CB(from)->offset ||
if (unlikely(MPTCP_SKB_CB(to)->cant_coalesce) ||
MPTCP_SKB_CB(from)->offset ||
((to->len + from->len) > (sk->sk_rcvbuf >> 3)) ||
!skb_try_coalesce(to, from, &fragstolen, &delta))
return false;
@@ -150,7 +140,7 @@ static bool mptcp_try_coalesce(struct sock *sk, struct sk_buff *to,
* negative one
*/
atomic_add(delta, &sk->sk_rmem_alloc);
mptcp_rmem_charge(sk, delta);
sk_mem_charge(sk, delta);
kfree_skb_partial(from, fragstolen);
return true;
@@ -165,44 +155,6 @@ static bool mptcp_ooo_try_coalesce(struct mptcp_sock *msk, struct sk_buff *to,
return mptcp_try_coalesce((struct sock *)msk, to, from);
}
static void __mptcp_rmem_reclaim(struct sock *sk, int amount)
{
amount >>= PAGE_SHIFT;
mptcp_rmem_charge(sk, amount << PAGE_SHIFT);
__sk_mem_reduce_allocated(sk, amount);
}
static void mptcp_rmem_uncharge(struct sock *sk, int size)
{
struct mptcp_sock *msk = mptcp_sk(sk);
int reclaimable;
mptcp_rmem_fwd_alloc_add(sk, size);
reclaimable = msk->rmem_fwd_alloc - sk_unused_reserved_mem(sk);
/* see sk_mem_uncharge() for the rationale behind the following schema */
if (unlikely(reclaimable >= PAGE_SIZE))
__mptcp_rmem_reclaim(sk, reclaimable);
}
static void mptcp_rfree(struct sk_buff *skb)
{
unsigned int len = skb->truesize;
struct sock *sk = skb->sk;
atomic_sub(len, &sk->sk_rmem_alloc);
mptcp_rmem_uncharge(sk, len);
}
void mptcp_set_owner_r(struct sk_buff *skb, struct sock *sk)
{
skb_orphan(skb);
skb->sk = sk;
skb->destructor = mptcp_rfree;
atomic_add(skb->truesize, &sk->sk_rmem_alloc);
mptcp_rmem_charge(sk, skb->truesize);
}
/* "inspired" by tcp_data_queue_ofo(), main differences:
* - use mptcp seqs
* - don't cope with sacks
@@ -315,25 +267,7 @@ merge_right:
end:
skb_condense(skb);
mptcp_set_owner_r(skb, sk);
}
static bool mptcp_rmem_schedule(struct sock *sk, struct sock *ssk, int size)
{
struct mptcp_sock *msk = mptcp_sk(sk);
int amt, amount;
if (size <= msk->rmem_fwd_alloc)
return true;
size -= msk->rmem_fwd_alloc;
amt = sk_mem_pages(size);
amount = amt << PAGE_SHIFT;
if (!__sk_mem_raise_allocated(sk, size, amt, SK_MEM_RECV))
return false;
mptcp_rmem_fwd_alloc_add(sk, amount);
return true;
skb_set_owner_r(skb, sk);
}
static bool __mptcp_move_skb(struct mptcp_sock *msk, struct sock *ssk,
@@ -351,7 +285,7 @@ static bool __mptcp_move_skb(struct mptcp_sock *msk, struct sock *ssk,
skb_orphan(skb);
/* try to fetch required memory from subflow */
if (!mptcp_rmem_schedule(sk, ssk, skb->truesize)) {
if (!sk_rmem_schedule(sk, skb, skb->truesize)) {
MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_RCVPRUNED);
goto drop;
}
@@ -366,6 +300,7 @@ static bool __mptcp_move_skb(struct mptcp_sock *msk, struct sock *ssk,
MPTCP_SKB_CB(skb)->end_seq = MPTCP_SKB_CB(skb)->map_seq + copy_len;
MPTCP_SKB_CB(skb)->offset = offset;
MPTCP_SKB_CB(skb)->has_rxtstamp = has_rxtstamp;
MPTCP_SKB_CB(skb)->cant_coalesce = 0;
if (MPTCP_SKB_CB(skb)->map_seq == msk->ack_seq) {
/* in sequence */
@@ -375,7 +310,7 @@ static bool __mptcp_move_skb(struct mptcp_sock *msk, struct sock *ssk,
if (tail && mptcp_try_coalesce(sk, tail, skb))
return true;
mptcp_set_owner_r(skb, sk);
skb_set_owner_r(skb, sk);
__skb_queue_tail(&sk->sk_receive_queue, skb);
return true;
} else if (after64(MPTCP_SKB_CB(skb)->map_seq, msk->ack_seq)) {
@@ -561,7 +496,7 @@ static void mptcp_cleanup_rbuf(struct mptcp_sock *msk, int copied)
bool cleanup, rx_empty;
cleanup = (space > 0) && (space >= (old_space << 1)) && copied;
rx_empty = !__mptcp_rmem(sk) && copied;
rx_empty = !sk_rmem_alloc_get(sk) && copied;
mptcp_for_each_subflow(msk, subflow) {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
@@ -634,27 +569,13 @@ static void mptcp_dss_corruption(struct mptcp_sock *msk, struct sock *ssk)
}
static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
struct sock *ssk,
unsigned int *bytes)
struct sock *ssk)
{
struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk);
struct sock *sk = (struct sock *)msk;
unsigned int moved = 0;
bool more_data_avail;
struct tcp_sock *tp;
bool done = false;
int sk_rbuf;
sk_rbuf = READ_ONCE(sk->sk_rcvbuf);
if (!(sk->sk_userlocks & SOCK_RCVBUF_LOCK)) {
int ssk_rbuf = READ_ONCE(ssk->sk_rcvbuf);
if (unlikely(ssk_rbuf > sk_rbuf)) {
WRITE_ONCE(sk->sk_rcvbuf, ssk_rbuf);
sk_rbuf = ssk_rbuf;
}
}
bool ret = false;
pr_debug("msk=%p ssk=%p\n", msk, ssk);
tp = tcp_sk(ssk);
@@ -664,20 +585,16 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
struct sk_buff *skb;
bool fin;
if (sk_rmem_alloc_get(sk) > sk->sk_rcvbuf)
break;
/* try to move as much data as available */
map_remaining = subflow->map_data_len -
mptcp_subflow_get_map_offset(subflow);
skb = skb_peek(&ssk->sk_receive_queue);
if (!skb) {
/* With racing move_skbs_to_msk() and __mptcp_move_skbs(),
* a different CPU can have already processed the pending
* data, stop here or we can enter an infinite loop
*/
if (!moved)
done = true;
if (unlikely(!skb))
break;
}
if (__mptcp_check_fallback(msk)) {
/* Under fallback skbs have no MPTCP extension and TCP could
@@ -690,19 +607,13 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
offset = seq - TCP_SKB_CB(skb)->seq;
fin = TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN;
if (fin) {
done = true;
if (fin)
seq++;
}
if (offset < skb->len) {
size_t len = skb->len - offset;
if (tp->urg_data)
done = true;
if (__mptcp_move_skb(msk, ssk, skb, offset, len))
moved += len;
ret = __mptcp_move_skb(msk, ssk, skb, offset, len) || ret;
seq += len;
if (unlikely(map_remaining < len)) {
@@ -716,22 +627,16 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
}
sk_eat_skb(ssk, skb);
done = true;
}
WRITE_ONCE(tp->copied_seq, seq);
more_data_avail = mptcp_subflow_data_available(ssk);
if (atomic_read(&sk->sk_rmem_alloc) > sk_rbuf) {
done = true;
break;
}
} while (more_data_avail);
if (moved > 0)
if (ret)
msk->last_data_recv = tcp_jiffies32;
*bytes += moved;
return done;
return ret;
}
static bool __mptcp_ofo_queue(struct mptcp_sock *msk)
@@ -825,9 +730,9 @@ void __mptcp_error_report(struct sock *sk)
static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
{
struct sock *sk = (struct sock *)msk;
unsigned int moved = 0;
bool moved;
__mptcp_move_skbs_from_subflow(msk, ssk, &moved);
moved = __mptcp_move_skbs_from_subflow(msk, ssk);
__mptcp_ofo_queue(msk);
if (unlikely(ssk->sk_err)) {
if (!sock_owned_by_user(sk))
@@ -843,14 +748,29 @@ static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
*/
if (mptcp_pending_data_fin(sk, NULL))
mptcp_schedule_work(sk);
return moved > 0;
return moved;
}
static void __mptcp_rcvbuf_update(struct sock *sk, struct sock *ssk)
{
if (unlikely(ssk->sk_rcvbuf > sk->sk_rcvbuf))
WRITE_ONCE(sk->sk_rcvbuf, ssk->sk_rcvbuf);
}
static void __mptcp_data_ready(struct sock *sk, struct sock *ssk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
__mptcp_rcvbuf_update(sk, ssk);
/* Wake-up the reader only for in-sequence data */
if (move_skbs_to_msk(msk, ssk) && mptcp_epollin_ready(sk))
sk->sk_data_ready(sk);
}
void mptcp_data_ready(struct sock *sk, struct sock *ssk)
{
struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk);
struct mptcp_sock *msk = mptcp_sk(sk);
int sk_rbuf, ssk_rbuf;
/* The peer can send data while we are shutting down this
* subflow at msk destruction time, but we must avoid enqueuing
@@ -859,19 +779,11 @@ void mptcp_data_ready(struct sock *sk, struct sock *ssk)
if (unlikely(subflow->disposable))
return;
ssk_rbuf = READ_ONCE(ssk->sk_rcvbuf);
sk_rbuf = READ_ONCE(sk->sk_rcvbuf);
if (unlikely(ssk_rbuf > sk_rbuf))
sk_rbuf = ssk_rbuf;
/* over limit? can't append more skbs to msk, Also, no need to wake-up*/
if (__mptcp_rmem(sk) > sk_rbuf)
return;
/* Wake-up the reader only for in-sequence data */
mptcp_data_lock(sk);
if (move_skbs_to_msk(msk, ssk) && mptcp_epollin_ready(sk))
sk->sk_data_ready(sk);
if (!sock_owned_by_user(sk))
__mptcp_data_ready(sk, ssk);
else
__set_bit(MPTCP_DEQUEUE, &mptcp_sk(sk)->cb_flags);
mptcp_data_unlock(sk);
}
@@ -950,20 +862,6 @@ bool mptcp_schedule_work(struct sock *sk)
return false;
}
static struct sock *mptcp_subflow_recv_lookup(const struct mptcp_sock *msk)
{
struct mptcp_subflow_context *subflow;
msk_owned_by_me(msk);
mptcp_for_each_subflow(msk, subflow) {
if (READ_ONCE(subflow->data_avail))
return mptcp_subflow_tcp_sock(subflow);
}
return NULL;
}
static bool mptcp_skb_can_collapse_to(u64 write_seq,
const struct sk_buff *skb,
const struct mptcp_ext *mpext)
@@ -1944,16 +1842,17 @@ do_error:
static void mptcp_rcv_space_adjust(struct mptcp_sock *msk, int copied);
static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk,
static int __mptcp_recvmsg_mskq(struct sock *sk,
struct msghdr *msg,
size_t len, int flags,
struct scm_timestamping_internal *tss,
int *cmsg_flags)
{
struct mptcp_sock *msk = mptcp_sk(sk);
struct sk_buff *skb, *tmp;
int copied = 0;
skb_queue_walk_safe(&msk->receive_queue, skb, tmp) {
skb_queue_walk_safe(&sk->sk_receive_queue, skb, tmp) {
u32 offset = MPTCP_SKB_CB(skb)->offset;
u32 data_len = skb->len - offset;
u32 count = min_t(size_t, len - copied, data_len);
@@ -1985,10 +1884,11 @@ static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk,
}
if (!(flags & MSG_PEEK)) {
/* we will bulk release the skb memory later */
/* avoid the indirect call, we know the destructor is sock_wfree */
skb->destructor = NULL;
WRITE_ONCE(msk->rmem_released, msk->rmem_released + skb->truesize);
__skb_unlink(skb, &msk->receive_queue);
atomic_sub(skb->truesize, &sk->sk_rmem_alloc);
sk_mem_uncharge(sk, skb->truesize);
__skb_unlink(skb, &sk->sk_receive_queue);
__kfree_skb(skb);
msk->bytes_consumed += count;
}
@@ -2101,66 +2001,65 @@ new_measure:
msk->rcvq_space.time = mstamp;
}
static void __mptcp_update_rmem(struct sock *sk)
static struct mptcp_subflow_context *
__mptcp_first_ready_from(struct mptcp_sock *msk,
struct mptcp_subflow_context *subflow)
{
struct mptcp_sock *msk = mptcp_sk(sk);
struct mptcp_subflow_context *start_subflow = subflow;
if (!msk->rmem_released)
return;
atomic_sub(msk->rmem_released, &sk->sk_rmem_alloc);
mptcp_rmem_uncharge(sk, msk->rmem_released);
WRITE_ONCE(msk->rmem_released, 0);
while (!READ_ONCE(subflow->data_avail)) {
subflow = mptcp_next_subflow(msk, subflow);
if (subflow == start_subflow)
return NULL;
}
return subflow;
}
static void __mptcp_splice_receive_queue(struct sock *sk)
static bool __mptcp_move_skbs(struct sock *sk)
{
struct mptcp_subflow_context *subflow;
struct mptcp_sock *msk = mptcp_sk(sk);
bool ret = false;
skb_queue_splice_tail_init(&sk->sk_receive_queue, &msk->receive_queue);
}
if (list_empty(&msk->conn_list))
return false;
static bool __mptcp_move_skbs(struct mptcp_sock *msk)
{
struct sock *sk = (struct sock *)msk;
unsigned int moved = 0;
bool ret, done;
/* verify we can move any data from the subflow, eventually updating */
if (!(sk->sk_userlocks & SOCK_RCVBUF_LOCK))
mptcp_for_each_subflow(msk, subflow)
__mptcp_rcvbuf_update(sk, subflow->tcp_sock);
do {
struct sock *ssk = mptcp_subflow_recv_lookup(msk);
subflow = list_first_entry(&msk->conn_list,
struct mptcp_subflow_context, node);
for (;;) {
struct sock *ssk;
bool slowpath;
/* we can have data pending in the subflows only if the msk
* receive buffer was full at subflow_data_ready() time,
* that is an unlikely slow path.
/*
* As an optimization avoid traversing the subflows list
* and ev. acquiring the subflow socket lock before baling out
*/
if (likely(!ssk))
if (sk_rmem_alloc_get(sk) > sk->sk_rcvbuf)
break;
slowpath = lock_sock_fast(ssk);
mptcp_data_lock(sk);
__mptcp_update_rmem(sk);
done = __mptcp_move_skbs_from_subflow(msk, ssk, &moved);
mptcp_data_unlock(sk);
subflow = __mptcp_first_ready_from(msk, subflow);
if (!subflow)
break;
ssk = mptcp_subflow_tcp_sock(subflow);
slowpath = lock_sock_fast(ssk);
ret = __mptcp_move_skbs_from_subflow(msk, ssk) || ret;
if (unlikely(ssk->sk_err))
__mptcp_error_report(sk);
unlock_sock_fast(ssk, slowpath);
} while (!done);
/* acquire the data lock only if some input data is pending */
ret = moved > 0;
if (!RB_EMPTY_ROOT(&msk->out_of_order_queue) ||
!skb_queue_empty_lockless(&sk->sk_receive_queue)) {
mptcp_data_lock(sk);
__mptcp_update_rmem(sk);
ret |= __mptcp_ofo_queue(msk);
__mptcp_splice_receive_queue(sk);
mptcp_data_unlock(sk);
subflow = mptcp_next_subflow(msk, subflow);
}
__mptcp_ofo_queue(msk);
if (ret)
mptcp_check_data_fin((struct sock *)msk);
return !skb_queue_empty(&msk->receive_queue);
return ret;
}
static unsigned int mptcp_inq_hint(const struct sock *sk)
@@ -2168,7 +2067,7 @@ static unsigned int mptcp_inq_hint(const struct sock *sk)
const struct mptcp_sock *msk = mptcp_sk(sk);
const struct sk_buff *skb;
skb = skb_peek(&msk->receive_queue);
skb = skb_peek(&sk->sk_receive_queue);
if (skb) {
u64 hint_val = READ_ONCE(msk->ack_seq) - MPTCP_SKB_CB(skb)->map_seq;
@@ -2214,7 +2113,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
while (copied < len) {
int err, bytes_read;
bytes_read = __mptcp_recvmsg_mskq(msk, msg, len - copied, flags, &tss, &cmsg_flags);
bytes_read = __mptcp_recvmsg_mskq(sk, msg, len - copied, flags, &tss, &cmsg_flags);
if (unlikely(bytes_read < 0)) {
if (!copied)
copied = bytes_read;
@@ -2223,7 +2122,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
copied += bytes_read;
if (skb_queue_empty(&msk->receive_queue) && __mptcp_move_skbs(msk))
if (skb_queue_empty(&sk->sk_receive_queue) && __mptcp_move_skbs(sk))
continue;
/* only the MPTCP socket status is relevant here. The exit
@@ -2249,7 +2148,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
/* race breaker: the shutdown could be after the
* previous receive queue check
*/
if (__mptcp_move_skbs(msk))
if (__mptcp_move_skbs(sk))
continue;
break;
}
@@ -2293,9 +2192,8 @@ out_err:
}
}
pr_debug("msk=%p rx queue empty=%d:%d copied=%d\n",
msk, skb_queue_empty_lockless(&sk->sk_receive_queue),
skb_queue_empty(&msk->receive_queue), copied);
pr_debug("msk=%p rx queue empty=%d copied=%d\n",
msk, skb_queue_empty(&sk->sk_receive_queue), copied);
release_sock(sk);
return copied;
@@ -2822,11 +2720,8 @@ static void __mptcp_init_sock(struct sock *sk)
INIT_LIST_HEAD(&msk->join_list);
INIT_LIST_HEAD(&msk->rtx_queue);
INIT_WORK(&msk->work, mptcp_worker);
__skb_queue_head_init(&msk->receive_queue);
msk->out_of_order_queue = RB_ROOT;
msk->first_pending = NULL;
WRITE_ONCE(msk->rmem_fwd_alloc, 0);
WRITE_ONCE(msk->rmem_released, 0);
msk->timer_ival = TCP_RTO_MIN;
msk->scaling_ratio = TCP_DEFAULT_SCALING_RATIO;
@@ -3052,8 +2947,6 @@ static void __mptcp_destroy_sock(struct sock *sk)
sk->sk_prot->destroy(sk);
WARN_ON_ONCE(READ_ONCE(msk->rmem_fwd_alloc));
WARN_ON_ONCE(msk->rmem_released);
sk_stream_kill_queues(sk);
xfrm_sk_free_policy(sk);
@@ -3405,18 +3298,12 @@ void mptcp_destroy_common(struct mptcp_sock *msk, unsigned int flags)
mptcp_for_each_subflow_safe(msk, subflow, tmp)
__mptcp_close_ssk(sk, mptcp_subflow_tcp_sock(subflow), subflow, flags);
/* move to sk_receive_queue, sk_stream_kill_queues will purge it */
mptcp_data_lock(sk);
skb_queue_splice_tail_init(&msk->receive_queue, &sk->sk_receive_queue);
__skb_queue_purge(&sk->sk_receive_queue);
skb_rbtree_purge(&msk->out_of_order_queue);
mptcp_data_unlock(sk);
/* move all the rx fwd alloc into the sk_mem_reclaim_final in
* inet_sock_destruct() will dispose it
*/
sk_forward_alloc_add(sk, msk->rmem_fwd_alloc);
WRITE_ONCE(msk->rmem_fwd_alloc, 0);
mptcp_token_destroy(msk);
mptcp_pm_free_anno_list(msk);
mptcp_free_local_addr_list(msk);
@@ -3453,7 +3340,8 @@ void __mptcp_check_push(struct sock *sk, struct sock *ssk)
#define MPTCP_FLAGS_PROCESS_CTX_NEED (BIT(MPTCP_PUSH_PENDING) | \
BIT(MPTCP_RETRANSMIT) | \
BIT(MPTCP_FLUSH_JOIN_LIST))
BIT(MPTCP_FLUSH_JOIN_LIST) | \
BIT(MPTCP_DEQUEUE))
/* processes deferred events and flush wmem */
static void mptcp_release_cb(struct sock *sk)
@@ -3487,6 +3375,11 @@ static void mptcp_release_cb(struct sock *sk)
__mptcp_push_pending(sk, 0);
if (flags & BIT(MPTCP_RETRANSMIT))
__mptcp_retrans(sk);
if ((flags & BIT(MPTCP_DEQUEUE)) && __mptcp_move_skbs(sk)) {
/* notify ack seq update */
mptcp_cleanup_rbuf(msk, 0);
sk->sk_data_ready(sk);
}
cond_resched();
spin_lock_bh(&sk->sk_lock.slock);
@@ -3506,8 +3399,6 @@ static void mptcp_release_cb(struct sock *sk)
if (__test_and_clear_bit(MPTCP_SYNC_SNDBUF, &msk->cb_flags))
__mptcp_sync_sndbuf(sk);
}
__mptcp_update_rmem(sk);
}
/* MP_JOIN client subflow must wait for 4th ack before sending any data:
@@ -3678,12 +3569,6 @@ static void mptcp_shutdown(struct sock *sk, int how)
__mptcp_wr_shutdown(sk);
}
static int mptcp_forward_alloc_get(const struct sock *sk)
{
return READ_ONCE(sk->sk_forward_alloc) +
READ_ONCE(mptcp_sk(sk)->rmem_fwd_alloc);
}
static int mptcp_ioctl_outq(const struct mptcp_sock *msk, u64 v)
{
const struct sock *sk = (void *)msk;
@@ -3724,7 +3609,8 @@ static int mptcp_ioctl(struct sock *sk, int cmd, int *karg)
return -EINVAL;
lock_sock(sk);
__mptcp_move_skbs(msk);
if (__mptcp_move_skbs(sk))
mptcp_cleanup_rbuf(msk, 0);
*karg = mptcp_inq_hint(sk);
release_sock(sk);
break;
@@ -3841,7 +3727,6 @@ static struct proto mptcp_prot = {
.hash = mptcp_hash,
.unhash = mptcp_unhash,
.get_port = mptcp_get_port,
.forward_alloc_get = mptcp_forward_alloc_get,
.stream_memory_free = mptcp_stream_memory_free,
.sockets_allocated = &mptcp_sockets_allocated,
+7 -15
View File
@@ -124,12 +124,14 @@
#define MPTCP_FLUSH_JOIN_LIST 5
#define MPTCP_SYNC_STATE 6
#define MPTCP_SYNC_SNDBUF 7
#define MPTCP_DEQUEUE 8
struct mptcp_skb_cb {
u64 map_seq;
u64 end_seq;
u32 offset;
u8 has_rxtstamp:1;
u8 has_rxtstamp;
u8 cant_coalesce;
};
#define MPTCP_SKB_CB(__skb) ((struct mptcp_skb_cb *)&((__skb)->cb[0]))
@@ -279,7 +281,6 @@ struct mptcp_sock {
u64 rcv_data_fin_seq;
u64 bytes_retrans;
u64 bytes_consumed;
int rmem_fwd_alloc;
int snd_burst;
int old_wspace;
u64 recovery_snd_nxt; /* in recovery mode accept up to this seq;
@@ -294,7 +295,6 @@ struct mptcp_sock {
u32 last_ack_recv;
unsigned long timer_ival;
u32 token;
int rmem_released;
unsigned long flags;
unsigned long cb_flags;
bool recovery; /* closing subflow write queue reinjected */
@@ -324,7 +324,6 @@ struct mptcp_sock {
struct work_struct work;
struct sk_buff *ooo_last_skb;
struct rb_root out_of_order_queue;
struct sk_buff_head receive_queue;
struct list_head conn_list;
struct list_head rtx_queue;
struct mptcp_data_frag *first_pending;
@@ -355,6 +354,8 @@ struct mptcp_sock {
list_for_each_entry(__subflow, &((__msk)->conn_list), node)
#define mptcp_for_each_subflow_safe(__msk, __subflow, __tmp) \
list_for_each_entry_safe(__subflow, __tmp, &((__msk)->conn_list), node)
#define mptcp_next_subflow(__msk, __subflow) \
list_next_entry_circular(__subflow, &((__msk)->conn_list), node)
extern struct genl_family mptcp_genl_family;
@@ -381,14 +382,6 @@ static inline void msk_owned_by_me(const struct mptcp_sock *msk)
#define mptcp_sk(ptr) container_of_const(ptr, struct mptcp_sock, sk.icsk_inet.sk)
#endif
/* the msk socket don't use the backlog, also account for the bulk
* free memory
*/
static inline int __mptcp_rmem(const struct sock *sk)
{
return atomic_read(&sk->sk_rmem_alloc) - READ_ONCE(mptcp_sk(sk)->rmem_released);
}
static inline int mptcp_win_from_space(const struct sock *sk, int space)
{
return __tcp_win_from_space(mptcp_sk(sk)->scaling_ratio, space);
@@ -401,7 +394,8 @@ static inline int mptcp_space_from_win(const struct sock *sk, int win)
static inline int __mptcp_space(const struct sock *sk)
{
return mptcp_win_from_space(sk, READ_ONCE(sk->sk_rcvbuf) - __mptcp_rmem(sk));
return mptcp_win_from_space(sk, READ_ONCE(sk->sk_rcvbuf) -
sk_rmem_alloc_get(sk));
}
static inline struct mptcp_data_frag *mptcp_send_head(const struct sock *sk)
@@ -1059,8 +1053,6 @@ void mptcp_event_pm_listener(const struct sock *ssk,
enum mptcp_event_type event);
bool mptcp_userspace_pm_active(const struct mptcp_sock *msk);
void __mptcp_fastopen_gen_msk_ackseq(struct mptcp_sock *msk, struct mptcp_subflow_context *subflow,
const struct mptcp_options_received *mp_opt);
void mptcp_fastopen_subflow_synack_set_params(struct mptcp_subflow_context *subflow,
struct request_sock *req);
int mptcp_nl_fill_addr(struct sk_buff *skb,
+18 -18
View File
@@ -802,9 +802,6 @@ void __mptcp_subflow_fully_established(struct mptcp_sock *msk,
subflow_set_remote_key(msk, subflow, mp_opt);
WRITE_ONCE(subflow->fully_established, true);
WRITE_ONCE(msk->fully_established, true);
if (subflow->is_mptfo)
__mptcp_fastopen_gen_msk_ackseq(msk, subflow, mp_opt);
}
static struct sock *subflow_syn_recv_sock(const struct sock *sk,
@@ -1271,7 +1268,12 @@ out:
subflow->map_valid = 0;
}
/* sched mptcp worker to remove the subflow if no more data is pending */
static bool subflow_is_done(const struct sock *sk)
{
return sk->sk_shutdown & RCV_SHUTDOWN || sk->sk_state == TCP_CLOSE;
}
/* sched mptcp worker for subflow cleanup if no more data is pending */
static void subflow_sched_work_if_closed(struct mptcp_sock *msk, struct sock *ssk)
{
struct sock *sk = (struct sock *)msk;
@@ -1281,8 +1283,18 @@ static void subflow_sched_work_if_closed(struct mptcp_sock *msk, struct sock *ss
inet_sk_state_load(sk) != TCP_ESTABLISHED)))
return;
if (skb_queue_empty(&ssk->sk_receive_queue) &&
!test_and_set_bit(MPTCP_WORK_CLOSE_SUBFLOW, &msk->flags))
if (!skb_queue_empty(&ssk->sk_receive_queue))
return;
if (!test_and_set_bit(MPTCP_WORK_CLOSE_SUBFLOW, &msk->flags))
mptcp_schedule_work(sk);
/* when the fallback subflow closes the rx side, trigger a 'dummy'
* ingress data fin, so that the msk state will follow along
*/
if (__mptcp_check_fallback(msk) && subflow_is_done(ssk) &&
msk->first == ssk &&
mptcp_update_rcv_data_fin(msk, READ_ONCE(msk->ack_seq), true))
mptcp_schedule_work(sk);
}
@@ -1842,11 +1854,6 @@ static void __subflow_state_change(struct sock *sk)
rcu_read_unlock();
}
static bool subflow_is_done(const struct sock *sk)
{
return sk->sk_shutdown & RCV_SHUTDOWN || sk->sk_state == TCP_CLOSE;
}
static void subflow_state_change(struct sock *sk)
{
struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk);
@@ -1873,13 +1880,6 @@ static void subflow_state_change(struct sock *sk)
subflow_error_report(sk);
subflow_sched_work_if_closed(mptcp_sk(parent), sk);
/* when the fallback subflow closes the rx side, trigger a 'dummy'
* ingress data fin, so that the msk state will follow along
*/
if (__mptcp_check_fallback(msk) && subflow_is_done(sk) && msk->first == sk &&
mptcp_update_rcv_data_fin(msk, READ_ONCE(msk->ack_seq), true))
mptcp_schedule_work(parent);
}
void mptcp_subflow_queue_clean(struct sock *listener_sk, struct sock *listener_ssk)
+1 -1
View File
@@ -460,7 +460,7 @@ META_COLLECTOR(int_sk_fwd_alloc)
*err = -1;
return;
}
dst->value = sk_forward_alloc_get(sk);
dst->value = READ_ONCE(sk->sk_forward_alloc);
}
META_COLLECTOR(int_sk_sndbuf)