udp: use a separate rx queue for packet reception
under udp flood the sk_receive_queue spinlock is heavily contended. This patch try to reduce the contention on such lock adding a second receive queue to the udp sockets; recvmsg() looks first in such queue and, only if empty, tries to fetch the data from sk_receive_queue. The latter is spliced into the newly added queue every time the receive path has to acquire the sk_receive_queue lock. The accounting of forward allocated memory is still protected with the sk_receive_queue lock, so udp_rmem_release() needs to acquire both locks when the forward deficit is flushed. On specific scenarios we can end up acquiring and releasing the sk_receive_queue lock multiple times; that will be covered by the next patch Suggested-by: Eric Dumazet <edumazet@google.com> Signed-off-by: Paolo Abeni <pabeni@redhat.com> Acked-by: Eric Dumazet <edumazet@google.com> Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
parent
65101aeca5
commit
2276f58ac5
|
@ -80,6 +80,9 @@ struct udp_sock {
|
||||||
struct sk_buff *skb,
|
struct sk_buff *skb,
|
||||||
int nhoff);
|
int nhoff);
|
||||||
|
|
||||||
|
/* udp_recvmsg try to use this before splicing sk_receive_queue */
|
||||||
|
struct sk_buff_head reader_queue ____cacheline_aligned_in_smp;
|
||||||
|
|
||||||
/* This field is dirtied by udp_recvmsg() */
|
/* This field is dirtied by udp_recvmsg() */
|
||||||
int forward_deficit;
|
int forward_deficit;
|
||||||
};
|
};
|
||||||
|
|
|
@ -249,13 +249,8 @@ void udp_destruct_sock(struct sock *sk);
|
||||||
void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len);
|
void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len);
|
||||||
int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb);
|
int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb);
|
||||||
void udp_skb_destructor(struct sock *sk, struct sk_buff *skb);
|
void udp_skb_destructor(struct sock *sk, struct sk_buff *skb);
|
||||||
static inline struct sk_buff *
|
struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
|
||||||
__skb_recv_udp(struct sock *sk, unsigned int flags, int noblock, int *peeked,
|
int noblock, int *peeked, int *off, int *err);
|
||||||
int *off, int *err)
|
|
||||||
{
|
|
||||||
return __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
|
|
||||||
udp_skb_destructor, peeked, off, err);
|
|
||||||
}
|
|
||||||
static inline struct sk_buff *skb_recv_udp(struct sock *sk, unsigned int flags,
|
static inline struct sk_buff *skb_recv_udp(struct sock *sk, unsigned int flags,
|
||||||
int noblock, int *err)
|
int noblock, int *err)
|
||||||
{
|
{
|
||||||
|
|
|
@ -26,8 +26,8 @@ static __inline__ int udplite_getfrag(void *from, char *to, int offset,
|
||||||
/* Designate sk as UDP-Lite socket */
|
/* Designate sk as UDP-Lite socket */
|
||||||
static inline int udplite_sk_init(struct sock *sk)
|
static inline int udplite_sk_init(struct sock *sk)
|
||||||
{
|
{
|
||||||
|
udp_init_sock(sk);
|
||||||
udp_sk(sk)->pcflag = UDPLITE_BIT;
|
udp_sk(sk)->pcflag = UDPLITE_BIT;
|
||||||
sk->sk_destruct = udp_destruct_sock;
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
138
net/ipv4/udp.c
138
net/ipv4/udp.c
|
@ -1167,19 +1167,24 @@ int udp_sendpage(struct sock *sk, struct page *page, int offset,
|
||||||
static void udp_rmem_release(struct sock *sk, int size, int partial)
|
static void udp_rmem_release(struct sock *sk, int size, int partial)
|
||||||
{
|
{
|
||||||
struct udp_sock *up = udp_sk(sk);
|
struct udp_sock *up = udp_sk(sk);
|
||||||
|
struct sk_buff_head *sk_queue;
|
||||||
int amt;
|
int amt;
|
||||||
|
|
||||||
if (likely(partial)) {
|
if (likely(partial)) {
|
||||||
up->forward_deficit += size;
|
up->forward_deficit += size;
|
||||||
size = up->forward_deficit;
|
size = up->forward_deficit;
|
||||||
if (size < (sk->sk_rcvbuf >> 2) &&
|
if (size < (sk->sk_rcvbuf >> 2) &&
|
||||||
!skb_queue_empty(&sk->sk_receive_queue))
|
!skb_queue_empty(&up->reader_queue))
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
size += up->forward_deficit;
|
size += up->forward_deficit;
|
||||||
}
|
}
|
||||||
up->forward_deficit = 0;
|
up->forward_deficit = 0;
|
||||||
|
|
||||||
|
/* acquire the sk_receive_queue for fwd allocated memory scheduling */
|
||||||
|
sk_queue = &sk->sk_receive_queue;
|
||||||
|
spin_lock(&sk_queue->lock);
|
||||||
|
|
||||||
sk->sk_forward_alloc += size;
|
sk->sk_forward_alloc += size;
|
||||||
amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1);
|
amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1);
|
||||||
sk->sk_forward_alloc -= amt;
|
sk->sk_forward_alloc -= amt;
|
||||||
|
@ -1188,9 +1193,14 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
|
||||||
__sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT);
|
__sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT);
|
||||||
|
|
||||||
atomic_sub(size, &sk->sk_rmem_alloc);
|
atomic_sub(size, &sk->sk_rmem_alloc);
|
||||||
|
|
||||||
|
/* this can save us from acquiring the rx queue lock on next receive */
|
||||||
|
skb_queue_splice_tail_init(sk_queue, &up->reader_queue);
|
||||||
|
|
||||||
|
spin_unlock(&sk_queue->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Note: called with sk_receive_queue.lock held.
|
/* Note: called with reader_queue.lock held.
|
||||||
* Instead of using skb->truesize here, find a copy of it in skb->dev_scratch
|
* Instead of using skb->truesize here, find a copy of it in skb->dev_scratch
|
||||||
* This avoids a cache line miss while receive_queue lock is held.
|
* This avoids a cache line miss while receive_queue lock is held.
|
||||||
* Look at __udp_enqueue_schedule_skb() to find where this copy is done.
|
* Look at __udp_enqueue_schedule_skb() to find where this copy is done.
|
||||||
|
@ -1306,10 +1316,12 @@ EXPORT_SYMBOL_GPL(__udp_enqueue_schedule_skb);
|
||||||
void udp_destruct_sock(struct sock *sk)
|
void udp_destruct_sock(struct sock *sk)
|
||||||
{
|
{
|
||||||
/* reclaim completely the forward allocated memory */
|
/* reclaim completely the forward allocated memory */
|
||||||
|
struct udp_sock *up = udp_sk(sk);
|
||||||
unsigned int total = 0;
|
unsigned int total = 0;
|
||||||
struct sk_buff *skb;
|
struct sk_buff *skb;
|
||||||
|
|
||||||
while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
|
skb_queue_splice_tail_init(&sk->sk_receive_queue, &up->reader_queue);
|
||||||
|
while ((skb = __skb_dequeue(&up->reader_queue)) != NULL) {
|
||||||
total += skb->truesize;
|
total += skb->truesize;
|
||||||
kfree_skb(skb);
|
kfree_skb(skb);
|
||||||
}
|
}
|
||||||
|
@ -1321,6 +1333,7 @@ EXPORT_SYMBOL_GPL(udp_destruct_sock);
|
||||||
|
|
||||||
int udp_init_sock(struct sock *sk)
|
int udp_init_sock(struct sock *sk)
|
||||||
{
|
{
|
||||||
|
skb_queue_head_init(&udp_sk(sk)->reader_queue);
|
||||||
sk->sk_destruct = udp_destruct_sock;
|
sk->sk_destruct = udp_destruct_sock;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1338,6 +1351,26 @@ void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len)
|
||||||
}
|
}
|
||||||
EXPORT_SYMBOL_GPL(skb_consume_udp);
|
EXPORT_SYMBOL_GPL(skb_consume_udp);
|
||||||
|
|
||||||
|
static struct sk_buff *__first_packet_length(struct sock *sk,
|
||||||
|
struct sk_buff_head *rcvq,
|
||||||
|
int *total)
|
||||||
|
{
|
||||||
|
struct sk_buff *skb;
|
||||||
|
|
||||||
|
while ((skb = skb_peek(rcvq)) != NULL &&
|
||||||
|
udp_lib_checksum_complete(skb)) {
|
||||||
|
__UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS,
|
||||||
|
IS_UDPLITE(sk));
|
||||||
|
__UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS,
|
||||||
|
IS_UDPLITE(sk));
|
||||||
|
atomic_inc(&sk->sk_drops);
|
||||||
|
__skb_unlink(skb, rcvq);
|
||||||
|
*total += skb->truesize;
|
||||||
|
kfree_skb(skb);
|
||||||
|
}
|
||||||
|
return skb;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* first_packet_length - return length of first packet in receive queue
|
* first_packet_length - return length of first packet in receive queue
|
||||||
* @sk: socket
|
* @sk: socket
|
||||||
|
@ -1347,22 +1380,20 @@ EXPORT_SYMBOL_GPL(skb_consume_udp);
|
||||||
*/
|
*/
|
||||||
static int first_packet_length(struct sock *sk)
|
static int first_packet_length(struct sock *sk)
|
||||||
{
|
{
|
||||||
struct sk_buff_head *rcvq = &sk->sk_receive_queue;
|
struct sk_buff_head *rcvq = &udp_sk(sk)->reader_queue;
|
||||||
|
struct sk_buff_head *sk_queue = &sk->sk_receive_queue;
|
||||||
struct sk_buff *skb;
|
struct sk_buff *skb;
|
||||||
int total = 0;
|
int total = 0;
|
||||||
int res;
|
int res;
|
||||||
|
|
||||||
spin_lock_bh(&rcvq->lock);
|
spin_lock_bh(&rcvq->lock);
|
||||||
while ((skb = skb_peek(rcvq)) != NULL &&
|
skb = __first_packet_length(sk, rcvq, &total);
|
||||||
udp_lib_checksum_complete(skb)) {
|
if (!skb && !skb_queue_empty(sk_queue)) {
|
||||||
__UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS,
|
spin_lock(&sk_queue->lock);
|
||||||
IS_UDPLITE(sk));
|
skb_queue_splice_tail_init(sk_queue, rcvq);
|
||||||
__UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS,
|
spin_unlock(&sk_queue->lock);
|
||||||
IS_UDPLITE(sk));
|
|
||||||
atomic_inc(&sk->sk_drops);
|
skb = __first_packet_length(sk, rcvq, &total);
|
||||||
__skb_unlink(skb, rcvq);
|
|
||||||
total += skb->truesize;
|
|
||||||
kfree_skb(skb);
|
|
||||||
}
|
}
|
||||||
res = skb ? skb->len : -1;
|
res = skb ? skb->len : -1;
|
||||||
if (total)
|
if (total)
|
||||||
|
@ -1400,6 +1431,79 @@ int udp_ioctl(struct sock *sk, int cmd, unsigned long arg)
|
||||||
}
|
}
|
||||||
EXPORT_SYMBOL(udp_ioctl);
|
EXPORT_SYMBOL(udp_ioctl);
|
||||||
|
|
||||||
|
struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
|
||||||
|
int noblock, int *peeked, int *off, int *err)
|
||||||
|
{
|
||||||
|
struct sk_buff_head *sk_queue = &sk->sk_receive_queue;
|
||||||
|
struct sk_buff_head *queue;
|
||||||
|
struct sk_buff *last;
|
||||||
|
long timeo;
|
||||||
|
int error;
|
||||||
|
|
||||||
|
queue = &udp_sk(sk)->reader_queue;
|
||||||
|
flags |= noblock ? MSG_DONTWAIT : 0;
|
||||||
|
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
|
||||||
|
do {
|
||||||
|
struct sk_buff *skb;
|
||||||
|
|
||||||
|
error = sock_error(sk);
|
||||||
|
if (error)
|
||||||
|
break;
|
||||||
|
|
||||||
|
error = -EAGAIN;
|
||||||
|
*peeked = 0;
|
||||||
|
do {
|
||||||
|
int _off = *off;
|
||||||
|
|
||||||
|
spin_lock_bh(&queue->lock);
|
||||||
|
skb = __skb_try_recv_from_queue(sk, queue, flags,
|
||||||
|
udp_skb_destructor,
|
||||||
|
peeked, &_off, err,
|
||||||
|
&last);
|
||||||
|
if (skb) {
|
||||||
|
spin_unlock_bh(&queue->lock);
|
||||||
|
*off = _off;
|
||||||
|
return skb;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (skb_queue_empty(sk_queue)) {
|
||||||
|
spin_unlock_bh(&queue->lock);
|
||||||
|
goto busy_check;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* refill the reader queue and walk it again */
|
||||||
|
_off = *off;
|
||||||
|
spin_lock(&sk_queue->lock);
|
||||||
|
skb_queue_splice_tail_init(sk_queue, queue);
|
||||||
|
spin_unlock(&sk_queue->lock);
|
||||||
|
|
||||||
|
skb = __skb_try_recv_from_queue(sk, queue, flags,
|
||||||
|
udp_skb_destructor,
|
||||||
|
peeked, &_off, err,
|
||||||
|
&last);
|
||||||
|
spin_unlock_bh(&queue->lock);
|
||||||
|
if (skb) {
|
||||||
|
*off = _off;
|
||||||
|
return skb;
|
||||||
|
}
|
||||||
|
|
||||||
|
busy_check:
|
||||||
|
if (!sk_can_busy_loop(sk))
|
||||||
|
break;
|
||||||
|
|
||||||
|
sk_busy_loop(sk, flags & MSG_DONTWAIT);
|
||||||
|
} while (!skb_queue_empty(sk_queue));
|
||||||
|
|
||||||
|
/* sk_queue is empty, reader_queue may contain peeked packets */
|
||||||
|
} while (timeo &&
|
||||||
|
!__skb_wait_for_more_packets(sk, &error, &timeo,
|
||||||
|
(struct sk_buff *)sk_queue));
|
||||||
|
|
||||||
|
*err = error;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
EXPORT_SYMBOL_GPL(__skb_recv_udp);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This should be easy, if there is something there we
|
* This should be easy, if there is something there we
|
||||||
* return it, otherwise we block.
|
* return it, otherwise we block.
|
||||||
|
@ -1490,7 +1594,8 @@ int udp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int noblock,
|
||||||
return err;
|
return err;
|
||||||
|
|
||||||
csum_copy_err:
|
csum_copy_err:
|
||||||
if (!__sk_queue_drop_skb(sk, skb, flags, udp_skb_destructor)) {
|
if (!__sk_queue_drop_skb(sk, &udp_sk(sk)->reader_queue, skb, flags,
|
||||||
|
udp_skb_destructor)) {
|
||||||
UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS, is_udplite);
|
UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS, is_udplite);
|
||||||
UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS, is_udplite);
|
UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS, is_udplite);
|
||||||
}
|
}
|
||||||
|
@ -2325,6 +2430,9 @@ unsigned int udp_poll(struct file *file, struct socket *sock, poll_table *wait)
|
||||||
unsigned int mask = datagram_poll(file, sock, wait);
|
unsigned int mask = datagram_poll(file, sock, wait);
|
||||||
struct sock *sk = sock->sk;
|
struct sock *sk = sock->sk;
|
||||||
|
|
||||||
|
if (!skb_queue_empty(&udp_sk(sk)->reader_queue))
|
||||||
|
mask |= POLLIN | POLLRDNORM;
|
||||||
|
|
||||||
sock_rps_record_flow(sk);
|
sock_rps_record_flow(sk);
|
||||||
|
|
||||||
/* Check for false positives due to checksum errors */
|
/* Check for false positives due to checksum errors */
|
||||||
|
|
|
@ -455,7 +455,8 @@ int udpv6_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
|
||||||
return err;
|
return err;
|
||||||
|
|
||||||
csum_copy_err:
|
csum_copy_err:
|
||||||
if (!__sk_queue_drop_skb(sk, skb, flags, udp_skb_destructor)) {
|
if (!__sk_queue_drop_skb(sk, &udp_sk(sk)->reader_queue, skb, flags,
|
||||||
|
udp_skb_destructor)) {
|
||||||
if (is_udp4) {
|
if (is_udp4) {
|
||||||
UDP_INC_STATS(sock_net(sk),
|
UDP_INC_STATS(sock_net(sk),
|
||||||
UDP_MIB_CSUMERRORS, is_udplite);
|
UDP_MIB_CSUMERRORS, is_udplite);
|
||||||
|
|
Loading…
Reference in New Issue
Block a user