在努力排查一个网络设备相关的问题;终不负期望,排查出根因了。

现象是 dmesg 里不停地打印以下日志:

1
page_pool_release_retry() stalled pool shutdown 1 inflight 2899 sec

根因:Kafka Go library IBM/sarama 存在 tcp 连接泄漏问题(Client SeekBroker Connection Leak)。

TL;DR 泄漏的 tcp 连接导致 page pool 无法回收 page 的因果链:

1
2
3
4
5
6
7
sarama 发生 tcp 连接泄漏
 → TCP sockets 一直处于 TCP_CLOSE 状态
   → sockets sk_receive_queues 缓存了 FINACK skb
     → 那些 skb 引用这从 page pool 分配出来的 page
       → page_pool_release_retry() 尝试回收 page 失败
         → page_pool_release_retry() 不断尝试回收,不断失败
           → 往 dmesg 里打印 log

1. 复现问题

发现问题的机器使用的是 Mellanox 网卡;反复尝试后,发现只需要 ip link set dev eth0 down 即可,然后等 1 分钟查看 dmesg 里是否出现 log。

1.1. 关掉网络设备发生了什么?

  1. 网络驱动在销毁队列时,向 page pool 归还 page:page_pool_put_defragged_page()
  2. 释放 page pool:page_pool_destroy()
  3. 如果有 inflight page,则启动 worker 定期(每秒)通过 page_pool_release_retry() 尝试回收 page 并释放 page pool。

1.2. 为什么要等 1 分钟?

直接查看 page_pool_release_retry() 的源代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// net/core/page_pool.c

#define DEFER_WARN_INTERVAL (60 * HZ)

static void page_pool_release_retry(struct work_struct *wq)
{
    struct delayed_work *dwq = to_delayed_work(wq);
    struct page_pool *pool = container_of(dwq, typeof(*pool), release_dw);
    int inflight;

    inflight = page_pool_release(pool);
    /* In rare cases, a driver bug may cause inflight to go negative.
     * Don't reschedule release if inflight is 0 or negative.
     * - If 0, the page_pool has been destroyed
     * - if negative, we will never recover
     * in both cases no reschedule is necessary.
     */
    if (inflight <= 0)
        return;

    /* Periodic warning */
    if (time_after_eq(jiffies, pool->defer_warn)) {
        int sec = (s32)((u32)jiffies - (u32)pool->defer_start) / HZ;

        pr_warn("%s() stalled pool shutdown %d inflight %d sec\n",
            __func__, inflight, sec);
        pool->defer_warn = jiffies + DEFER_WARN_INTERVAL;
    }

    /* Still not ready to be disconnected, retry later */
    schedule_delayed_work(&pool->release_dw, DEFER_TIME);
}

其中,只有超过 pool->defer_warn (1 分钟)时才会打印日志;所以当 page_pool_release_retry() 第一次运行时,并不会打印日志。

而且:

  1. 在第 1 分钟,大概率能回收不少 page;因此,在第 1 分钟内打印日志毫无意义,还会造成噪音。
  2. Jason Xing 大佬加的 if (inflight <= 0) 优化并没有起作用,因为 inflight 大于 0,真的有 page 还没归还。

1.3. 等 1 分钟,dmesg 里可能没有日志

在查出更多细节前,并不知道还有没有 page 没归还。

1.4. 排查的转折点

即使咨询 AI,大部分时候都无济于事;AI 能提供思路和方向,但作用不大。

page_pool.c 里,有 2 个 tracepoint 能用来跟踪 page 的分配和回收情况:

1
2
3
4
# bpfsnoop --show-func-proto -t 'page_pool_state_*'
Kernel tracepoints: (total 2)
void page_pool_state_hold(const struct page_pool *pool, const struct page *page, u32 hold);
void page_pool_state_release(const struct page_pool *pool, const struct page *page, u32 release);

但是,即使通过这 2 个 tracepoint 分析出哪些 page 还没回收,还是无法知道那些 page 还没回收的原因。

【进展】持续观察 dmesg,发现一段时间后那些日志会减少、甚至不再打印日志。

也就是说:在这过程中,有 page 被归还并回收掉了。

下一步,祭出 bpfsnoop:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# bpfsnoop \
    -k '(s)page_pool_put_*' \
    -t '(s)page_pool_state_*' \
        --filter-arg 'pool->destroy_cnt > 1' \
        --output-arg 'pool->destroy_cnt' \
        --output-arg 'pool->pages_state_hold_cnt' \
        --output-arg 'pool->pages_state_release_cnt.counter' \
        --output-arg '*page' -o inflight_page.bpfsnoop.log

← page_pool_put_defragged_page args=((struct page_pool *)pool=0xff110001fde50800, (struct page *)page=0xffd4000007fc18c0, (unsigned int)dma_sync_size=0xffffffff/4294967295, (bool)allow_direct=false) retval=(void) cpu=22 process=(262275:xxx-agent) timestamp=17:39:16.551295117
Arg attrs: (u64)'pool->destroy_cnt'=0x45/69, (u32)'pool->pages_state_hold_cnt'=0x1000/4096, (int)'pool->pages_state_release_cnt.counter'=4095, (struct page)'*page'={"flags": 6755398367313920, "": {"": {"": {"lru": {"next": 0xdead000000000040, "prev": 0xff110001fde50800}, "": {"__filler": 0xdead000000000040, "mlock_count": 4259645440}, "buddy_list": {"next": 0xdead000000000040, "prev": 0xff110001fde50800}, "pcp_list": {"next": 0xdead000000000040, "prev": 0xff110001fde50800}}, "mapping": 0x0, "": {"index": 8573562880, "share": 8573562880}, "private": 1}, "": {"pp_magic": 16045481047390945344, "pp": 0xff110001fde50800, "_pp_mapping_pad": 0, "dma_addr": 8573562880, "": {"dma_addr_upper": 1, "pp_frag_count": {"counter": 1}}}, "": {"compound_head": 16045481047390945344}, "": {"pgmap": 0xdead000000000040, "zone_device_data": 0xff110001fde50800}, "callback_head": {"next": 0xdead000000000040, "func": 0xff110001fde50800}}, "": {"_mapcount": {"counter": -1}, "page_type": 4294967295}, "_refcount": {"counter": 1}, "memcg_data": 0}
Func stack:
  page_pool_put_defragged_page+0x5                      ; net/core/page_pool.c:640
  skb_free_head+0x55                                    ; net/core/skbuff.c:953
  skb_release_data+0x159                                ; net/core/skbuff.c:998
  __kfree_skb+0x2b                                      ; net/core/skbuff.c:1068
  __tcp_close+0x93                                      ; include/linux/skbuff.h:2065
  inet_release+0x44                                     ; net/ipv4/af_inet.c:435
  __sock_release+0x40                                   ; net/socket.c:660
  sock_close+0x15                                       ; net/socket.c:1423
  __fput+0xfe                                           ; fs/file_table.c:385
  ____fput+0xe                                          ; fs/file_table.c:413
  task_work_run+0x65                                    ; arch/x86/include/asm/jump_label.h:27
  do_exit+0x298                                         ; kernel/exit.c:884
  do_group_exit+0x35                                    ; kernel/exit.c:1006
  get_signal+0x95f                                      ; kernel/signal.c:2902
  arch_do_signal_or_restart+0x39
  exit_to_user_mode_loop+0x9a                           ; kernel/entry/common.c:176
  exit_to_user_mode_prepare+0xa5                        ; kernel/entry/common.c:210
  syscall_exit_to_user_mode+0x29
  do_syscall_64+0x62                                    ; arch/x86/entry/common.c:88
  entry_SYSCALL_64_after_hwframe+0x78                   ; arch/x86/entry/entry_64.S:121

一段时间后,分析 bpfsnoop 日志文件,发现如上调用栈:有 page 是在 socket 释放时通过 __kfree_skb() 释放的。

2. 协议栈行为分析

从如上调用栈看,涉及 tcp socket,特别是释放 tcp socket 阶段。

2.1. __tcp_close() 为什么会释放 skb?

直接查看 __tcp_close() 的源代码,看看为什么会调用 __kfree_skb()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// net/ipv4/tcp.c

void __tcp_close(struct sock *sk, long timeout)
{
    struct sk_buff *skb;
    int data_was_unread = 0;
    int state;

    WRITE_ONCE(sk->sk_shutdown, SHUTDOWN_MASK);

    if (sk->sk_state == TCP_LISTEN) {
        tcp_set_state(sk, TCP_CLOSE);

        /* Special case. */
        inet_csk_listen_stop(sk);

        goto adjudge_to_death;
    }

    /*  We need to flush the recv. buffs.  We do this only on the
     *  descriptor close, not protocol-sourced closes, because the
     *  reader process may not have drained the data yet!
     */
    while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
        u32 len = TCP_SKB_CB(skb)->end_seq - TCP_SKB_CB(skb)->seq;

        if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
            len--;
        data_was_unread += len;
        __kfree_skb(skb);
    }

    /* If socket has been already reset (e.g. in tcp_reset()) - kill it. */
    if (sk->sk_state == TCP_CLOSE)
        goto adjudge_to_death;

    /*...*/

out:
    bh_unlock_sock(sk);
    local_bh_enable();
}

原来是在清理 sk->sk_receive_queue

2.2. __kfree_skb() 是如何归还 page 的?

上面调用栈提供的信息并不能说明 __kfree_skb() 会归还 page 的原因。

直接深挖 __kfree_skb() 的源代码:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// net/core/skb.c

/*
__kfree_skb()
|-->skb_release_all()
    |-->skb_release_data()
        |-->skb_free_head()
            |-->skb_pp_recycle()
                |-->napi_pp_put_page()
                    |-->page_pool_put_full_page()
*/

void __kfree_skb(struct sk_buff *skb)
{
    skb_release_all(skb, SKB_DROP_REASON_NOT_SPECIFIED, false);
    kfree_skbmem(skb);
}
EXPORT_SYMBOL(__kfree_skb);

/* Free everything but the sk_buff shell. */
static void skb_release_all(struct sk_buff *skb, enum skb_drop_reason reason,
                bool napi_safe)
{
    skb_release_head_state(skb);
    if (likely(skb->head))
        skb_release_data(skb, reason, napi_safe);
}

static void skb_release_data(struct sk_buff *skb, enum skb_drop_reason reason,
                 bool napi_safe)
{
    struct skb_shared_info *shinfo = skb_shinfo(skb);
    int i;

    if (skb->cloned &&
        atomic_sub_return(skb->nohdr ? (1 << SKB_DATAREF_SHIFT) + 1 : 1,
                  &shinfo->dataref))
        goto exit;

    if (skb_zcopy(skb)) {
        bool skip_unref = shinfo->flags & SKBFL_MANAGED_FRAG_REFS;

        skb_zcopy_clear(skb, true);
        if (skip_unref)
            goto free_head;
    }

    for (i = 0; i < shinfo->nr_frags; i++)
        napi_frag_unref(&shinfo->frags[i], skb->pp_recycle, napi_safe);

free_head:
    if (shinfo->frag_list)
        kfree_skb_list_reason(shinfo->frag_list, reason);

    skb_free_head(skb, napi_safe);
exit:
    /* When we clone an SKB we copy the reycling bit. The pp_recycle
     * bit is only set on the head though, so in order to avoid races
     * while trying to recycle fragments on __skb_frag_unref() we need
     * to make one SKB responsible for triggering the recycle path.
     * So disable the recycling bit if an SKB is cloned and we have
     * additional references to the fragmented part of the SKB.
     * Eventually the last SKB will have the recycling bit set and it's
     * dataref set to 0, which will trigger the recycling
     */
    skb->pp_recycle = 0;
}

static void skb_free_head(struct sk_buff *skb, bool napi_safe)
{
    unsigned char *head = skb->head;

    if (skb->head_frag) {
        if (skb_pp_recycle(skb, head, napi_safe))
            return;
        skb_free_frag(head);
    } else {
        skb_kfree_head(head, skb_end_offset(skb));
    }
}

static bool skb_pp_recycle(struct sk_buff *skb, void *data, bool napi_safe)
{
    if (!IS_ENABLED(CONFIG_PAGE_POOL) || !skb->pp_recycle)
        return false;
    return napi_pp_put_page(virt_to_page(data), napi_safe);
}

bool napi_pp_put_page(struct page *page, bool napi_safe)
{
    bool allow_direct = false;
    struct page_pool *pp;

    page = compound_head(page);

    /* page->pp_magic is OR'ed with PP_SIGNATURE after the allocation
     * in order to preserve any existing bits, such as bit 0 for the
     * head page of compound page and bit 1 for pfmemalloc page, so
     * mask those bits for freeing side when doing below checking,
     * and page_is_pfmemalloc() is checked in __page_pool_put_page()
     * to avoid recycling the pfmemalloc page.
     */
    if (unlikely((page->pp_magic & ~0x3UL) != PP_SIGNATURE))
        return false;

    pp = page->pp;

    /* Allow direct recycle if we have reasons to believe that we are
     * in the same context as the consumer would run, so there's
     * no possible race.
     * __page_pool_put_page() makes sure we're not in hardirq context
     * and interrupts are enabled prior to accessing the cache.
     */
    if (napi_safe || in_softirq()) {
        const struct napi_struct *napi = READ_ONCE(pp->p.napi);

        allow_direct = napi &&
            READ_ONCE(napi->list_owner) == smp_processor_id();
    }

    /* Driver set this to memory recycling info. Reset it on recycle.
     * This will *not* work for NIC using a split-page memory model.
     * The page will be returned to the pool here regardless of the
     * 'flipped' fragment being in use or not.
     */
    page_pool_put_full_page(pp, page, allow_direct);

    return true;
}
EXPORT_SYMBOL(napi_pp_put_page);

其中关键之处:

  1. skb->head_frag
  2. skb->pp_recycle

有没有办法遍历 tcp socket,并查看 sk->sk_receive_queue 里的 skb 呢?

2.3. 尝试 bpf 的 iter/tcp

让 AI 生成了 bpf 代码,调整一下,如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//go:build ignore
// SPDX-License-Identifier: GPL-2.0
/* Copyright 2025 Leon Hwang */

#include "bpf_all.h"

SEC("iter/tcp")
int iter_tcp(struct bpf_iter__tcp *ctx)
{
    struct seq_file *seq = ctx->meta->seq;
    struct sock_common *skc = ctx->sk_common;
    struct tcp_sock *tp;
    struct sock *sk;
    struct sk_buff_head *queue;
    struct sk_buff *skb;
    int qlen, cnt = 0, cnt_pp = 0;

    if (!skc)
        return 0;

    tp = bpf_skc_to_tcp_sock(skc);
    if (!tp)
        return 0;

    sk   = (struct sock *)tp;
    queue = &sk->sk_receive_queue;

    qlen = BPF_CORE_READ(queue, qlen);

    skb = (struct sk_buff *)BPF_CORE_READ(queue, next);
    for (int i = 0; i < 100 && skb != (struct sk_buff *)queue; i++) {
        cnt++;
        cnt_pp += BPF_CORE_READ_BITFIELD_PROBED(skb, pp_recycle);

        skb = (struct sk_buff *)BPF_CORE_READ(skb, next);
    }

        if (cnt == 0)
                return 0;

    BPF_SEQ_PRINTF(seq, "state=%d src=%pI4:%u dst=%pI4:%u\n",
               skc->skc_state,
               &skc->skc_rcv_saddr, skc->skc_num,
               &skc->skc_daddr, bpf_ntohs(skc->skc_dport));

    BPF_SEQ_PRINTF(seq, "  rx_queue: qlen=%d iterated=%d (pp_recycle=%d)\n",
               qlen, cnt, cnt_pp);

    return 0;
}

可是,跑起来后,什么都没有。

得另想办法。

2.4. 使用内核模块遍历 tcp socket

使用内核模块来遍历指定 PID 的所有 tcp socket,肯定靠谱,而且能拿到所有需要的信息。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#include <linux/module.h>
#include <linux/pid.h>
#include <linux/sched.h>
#include <linux/fs.h>
#include <linux/file.h>
#include <linux/fdtable.h>
#include <net/sock.h>
#include <net/inet_sock.h>
#include <net/tcp.h>
#include <net/tcp_states.h>
#include <linux/ip.h>
#include <linux/tcp.h>
#include <linux/inet.h>
#include <net/ipv6.h>
#include <net/page_pool/helpers.h>

static pid_t target_pid;

static const char *tcp_state_str(int state)
{
    switch (state) {
    case TCP_ESTABLISHED: return "ESTABLISHED";
    case TCP_SYN_SENT:    return "SYN_SENT";
    case TCP_SYN_RECV:    return "SYN_RECV";
    case TCP_FIN_WAIT1:   return "FIN_WAIT1";
    case TCP_FIN_WAIT2:   return "FIN_WAIT2";
    case TCP_TIME_WAIT:   return "TIME_WAIT";
    case TCP_CLOSE:       return "CLOSE";
    case TCP_CLOSE_WAIT:  return "CLOSE_WAIT";
    case TCP_LAST_ACK:    return "LAST_ACK";
    case TCP_LISTEN:      return "LISTEN";
    case TCP_CLOSING:     return "CLOSING";
    case TCP_NEW_SYN_RECV: return "NEW_SYN_RECV";
    default:              return "UNKNOWN";
    }
}

static void log_pp_recycled_skb(struct sock *sk, unsigned long ino,
                unsigned int qlen, struct sk_buff *skb)
{
    struct inet_sock *inet = inet_sk(sk);
    struct tcp_sock *tp = tcp_sk(sk);
    struct skb_shared_info *shinfo = skb_shinfo(skb);
    char src_ip[64] = {0};
    char dst_ip[64] = {0};
    u16 src_port = 0, dst_port = 0;
    unsigned int i;

    if (sk->sk_family == AF_INET) {
        snprintf(src_ip, sizeof(src_ip), "%pI4", &inet->inet_saddr);
        snprintf(dst_ip, sizeof(dst_ip), "%pI4", &inet->inet_daddr);
        src_port = ntohs(inet->inet_sport);
        dst_port = ntohs(inet->inet_dport);
#if IS_ENABLED(CONFIG_IPV6)
    } else if (sk->sk_family == AF_INET6) {
        snprintf(src_ip, sizeof(src_ip), "%pI6c", &sk->sk_v6_rcv_saddr);
        snprintf(dst_ip, sizeof(dst_ip), "%pI6c", &sk->sk_v6_daddr);
        src_port = ntohs(inet->inet_sport);
        dst_port = ntohs(inet->inet_dport);
#endif
    }

    pr_info("pid %d socket 0x%llx ino=%lu state=%u/%s shutdown=0x%x err=%d(soft=%d) linger2=%d dead=%u: receive_queue(qlen=%u): skb 0x%llx pp_recycle=%d tcp_flags=0x%02x cloned=%u head_frag=%u %s:%u->%s:%u len=%u data_len=%u nr_frags=%u\n",
        target_pid, (unsigned long long)sk, ino, sk->sk_state, tcp_state_str(sk->sk_state),
        sk->sk_shutdown, sk->sk_err, sk->sk_err_soft, tp->linger2, sock_flag(sk, SOCK_DEAD),
        qlen, (unsigned long long)skb, skb->pp_recycle,
        TCP_SKB_CB(skb)->tcp_flags,
        skb->cloned, skb->head_frag,
        src_ip, src_port, dst_ip, dst_port, skb->len, skb->data_len, shinfo->nr_frags);

    /* Log linear buffer head page if present */
    if (skb->head) {
        struct page *head_page = virt_to_head_page(skb->head);
        dma_addr_t head_dma = page_pool_get_dma_addr(head_page);
        pr_info("  pid %d head: page=0x%llx pfn=%lu data_off=%u headlen=%u dma=0x%llx\n",
            target_pid, (unsigned long long)head_page, page_to_pfn(head_page),
            (unsigned int)(skb->data - skb->head), skb_headlen(skb),
            (unsigned long long)head_dma);
    }

    for (i = 0; i < shinfo->nr_frags; i++) {
        const skb_frag_t *frag = &shinfo->frags[i];
        struct page *page = skb_frag_page(frag);
        dma_addr_t dma = page_pool_get_dma_addr(page);

        pr_info("  pid %d frag[%u]: page=0x%llx pfn=%lu off=%u size=%u dma=0x%llx\n",
            target_pid, i, (unsigned long long)page, page_to_pfn(page),
            skb_frag_off(frag), skb_frag_size(frag),
            (unsigned long long)dma);
    }
}

static void inspect_socket(struct socket *sock, unsigned long ino)
{
    struct sock *sk = sock->sk;
    struct sk_buff *skb;
    unsigned int qlen;
    int owned_before;

    if (!sk || sock->type != SOCK_STREAM || sk->sk_protocol != IPPROTO_TCP || sk->sk_state != TCP_CLOSE)
        return;

    owned_before = sock_owned_by_user(sk);
    lock_sock(sk);

    /* Log if we had to wait for lock */
    if (owned_before)
        pr_info("pid %d socket %p ino=%lu: WAITED for lock (was owned=%d)\n",
            target_pid, sk, ino, owned_before);

    qlen = skb_queue_len(&sk->sk_receive_queue);

    /* Check if there's backlog that will be processed */
    if (sk->sk_backlog.tail) {
        pr_info("pid %d socket %p ino=%lu: HAS BACKLOG (tail=0x%llx)\n",
            target_pid, sk, ino, (unsigned long long)sk->sk_backlog.tail);
    }

    skb_queue_walk(&sk->sk_receive_queue, skb) {
        log_pp_recycled_skb(sk, ino, qlen, skb);
    }
    release_sock(sk);
}

static bool inspect_fd_entry(struct files_struct *files, unsigned int fd)
{
    struct fdtable *fdt;
    struct file __rcu **fdentry;
    struct file *file;
    struct socket *sock;
    unsigned long ino = 0;

    rcu_read_lock();
    fdt = files_fdtable(files);
    if (fd >= fdt->max_fds) {
        rcu_read_unlock();
        return false;
    }

    fdentry = fdt->fd + array_index_nospec(fd, fdt->max_fds);
    file = rcu_dereference_raw(*fdentry);
    if (!file || !get_file_rcu(file)) {
        rcu_read_unlock();
        return true;
    }
    if (unlikely(files_fdtable(files) != fdt) ||
        unlikely(rcu_dereference_raw(*fdentry) != file)) {
        fput(file);
        rcu_read_unlock();
        return true;
    }
    rcu_read_unlock();

    if (file_inode(file))
        ino = file_inode(file)->i_ino;

    sock = sock_from_file(file);
    if (sock)
        inspect_socket(sock, ino);
    fput(file);

    return true;
}

static void walk_task_sockets(struct files_struct *files)
{
    unsigned int fd;

    for (fd = 0; inspect_fd_entry(files, fd); fd++)
        ;
}

static int __init qinspect_init(void)
{
    struct pid *pid;
    struct task_struct *task;
    struct files_struct *files = NULL;
    int ret = 0;

    pid = find_get_pid(target_pid);
    if (!pid)
        return -ESRCH;

    task = get_pid_task(pid, PIDTYPE_PID);
    put_pid(pid);
    if (!task)
        return -ESRCH;

    task_lock(task);
    files = task->files;
    if (files)
        atomic_inc(&files->count);
    task_unlock(task);

    if (!files) {
        ret = -ENOENT;
        goto out_put_task;
    }

    walk_task_sockets(files);

    atomic_dec(&files->count);

out_put_task:
    put_task_struct(task);
    return ret;
}

static void __exit qinspect_exit(void) { }

module_param(target_pid, int, 0444);
module_init(qinspect_init);
module_exit(qinspect_exit);
MODULE_LICENSE("GPL");
MODULE_AUTHOR("Leon Hwang <[email protected]>");
MODULE_DESCRIPTION("TCP Socket sk_receive_queue Inspection Module");

需要 Makefile:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
KDIR ?= /lib/modules/$(shell uname -r)/build

KMOD_NAME := tcp_rxq
KMOD := $(KMOD_NAME).ko

obj-m := $(KMOD_NAME).o
$(KMOD_NAME)-objs := $(KMOD_NAME).kmod.o

.PHONY: all clean

all:
    $(MAKE) -C $(KDIR) M=$(CURDIR) modules

clean:
    $(MAKE) -C $(KDIR) M=$(CURDIR) clean

run:
    if [ -z $(PID) ]; then exit 1; fi
    insmod $(KMOD) target_pid=$(PID)
    rmmod $(KMOD_NAME)
    dmesg | grep $(PID)

用法:make run PID=$(pidof xxx-agent)

跑起来后,可以看到:

1
2
[102694.540568] pid 286275 socket 0xff110002e0c53600 ino=4741242 state=7/CLOSE shutdown=0x3 err=32(soft=0) linger2=0 dead=0: receive_queue(qlen=1): skb 0xff110002b6aa6100 pp_recycle=1 tcp_flags=0x11 cloned=0 head_frag=1 10.x.x.x:43542->10.y.y.y:9093 len=0 data_len=0 nr_frags=0
[102694.540576]   pid 286275 head: page=0xffd400000f7bd040 pfn=4058945 data_off=118 headlen=0 dma=0x3def41000

其中关键信息:

  1. 有不少 socket 的状态是 TCP_CLOSE
  2. sk->sk_receive_queue 缓存着一个 FINACK skb(tcp_flags=0x11)。
  3. 目的端口是 9093
  4. pp_recycle=1
  5. 该 skb 引用着一个 page。

到目前为止,dmesg 里打印 log 的行为便解释得通了。

可是,事情并未结束呀:

  1. 那些 socket 的状态为什么会变成 TCP_CLOSE 的?
  2. 根因是?
  3. 从根本上解决掉问题。
  4. 有哪些地方可以改进呢?

2.5. socket 变成 TCP_CLOSE 的原因

继续祭出 bpfsnoop:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# bpfsnoop \
        -k sk_stream_wait_close \
        -k '(s)tcp_reset' \
        -k '(s)tcp_done' \
        -k '(s)tcp_done_with_error' \
        -k '(b)__tcp_close' \
                --output-arg 'sk->sk_receive_queue.qlen' \
                --output-arg '&sk->sk_receive_queue' \
                --output-arg 'sk->sk_receive_queue.next' \
                --output-arg 'sk->sk_receive_queue.prev' \
                --output-arg 'sk->sk_receive_queue.next->pp_recycle' \
                --output-arg 'ip42(&sk->__sk_common.skc_daddr)' \
                --output-arg 'port(&sk->__sk_common.skc_dport)' \
                --output-arg 'sk->__sk_common.skc_num' \
        -o tcp_close.bpfsnoop.4.log

← tcp_done_with_error args=((struct sock *)sk=0xff1100011a2f0900, (int)err=32) retval=(void) cpu=45 process=(0:swapper/45) timestamp=15:06:41.253928962
Arg attrs: (__u32)'sk->sk_receive_queue.qlen'=0x1/1, (struct sk_buff_head *)'&sk->sk_receive_queue'=0xff1100011a2f09d8, (struct sk_buff *)'sk->sk_receive_queue.next'=0xff1100012619a100, (struct sk_buff *)'sk->sk_receive_queue.prev'=0xff1100012619a100, (__u8)'sk->sk_receive_queue.next->pp_recycle'=0x1, (unsigned int *)'ip42(&sk->__sk_common.skc_daddr)'=[10.x.x.x,10.y.y.y], (short unsigned int *)'port(&sk->__sk_common.skc_dport)'=9092, (__u16)'sk->__sk_common.skc_num'=0x0/0
Func stack:
  tcp_done_with_error+0x5                               ; net/ipv4/tcp_input.c:4441
  bpf_trampoline_6442479037+0x39
  tcp_reset+0x5                                         ; net/ipv4/tcp_input.c:4456
  tcp_rcv_state_process+0x20e                           ; net/ipv4/tcp_input.c:6696
  tcp_v4_do_rcv+0xd3                                    ; net/ipv4/tcp_ipv4.c:1757
  tcp_v4_rcv+0xbe2                                      ; net/ipv4/tcp_ipv4.c:2166
  ip_protocol_deliver_rcu+0x3c                          ; net/ipv4/ip_input.c:205
  ip_local_deliver_finish+0x72                          ; net/ipv4/ip_input.c:237
  ip_local_deliver+0x6c                                 ; net/ipv4/ip_input.c:257
  ip_sublist_rcv_finish+0x77                            ; include/net/dst.h:477
  ip_sublist_rcv+0x17c                                  ; net/ipv4/ip_input.c:640
  ip_list_rcv+0x102                                     ; net/ipv4/ip_input.c:675
  __netif_receive_skb_list_core+0x22d                   ; net/core/dev.c:5639
  netif_receive_skb_list_internal+0x19e                 ; net/core/dev.c:5741
  napi_complete_done+0x74                               ; include/net/gro.h:448
  mlx5e_napi_poll+0x173
  __napi_poll+0x33                                      ; net/core/dev.c:6600
  net_rx_action+0x18a                                   ; net/core/dev.c:6669
  handle_softirqs+0xe8                                  ; arch/x86/include/asm/jump_label.h:27
  __irq_exit_rcu+0x77                                   ; kernel/softirq.c:612
  irq_exit_rcu+0xe                                      ; kernel/softirq.c:676
  common_interrupt+0xa4
  asm_common_interrupt+0x27                             ; arch/x86/include/asm/idtentry.h:678
  cpuidle_enter_state+0xda                              ; drivers/cpuidle/cpuidle.c:291
  cpuidle_enter+0x2e
  call_cpuidle+0x23                                     ; kernel/sched/idle.c:135
  cpuidle_idle_call+0x11d                               ; kernel/sched/idle.c:219
  do_idle+0x82                                          ; kernel/sched/idle.c:284
  cpu_startup_entry+0x2a                                ; kernel/sched/idle.c:379
  start_secondary+0x129                                 ; arch/x86/kernel/smpboot.c:211
  secondary_startup_64_no_verify+0x18f                  ; arch/x86/kernel/head_64.S:449

其中 err=32 就是 err = EPIPE

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// net/ipv4/tcp_input.c

void tcp_done_with_error(struct sock *sk, int err)
{
    /* This barrier is coupled with smp_rmb() in tcp_poll() */
    WRITE_ONCE(sk->sk_err, err);
    smp_wmb();

    tcp_write_queue_purge(sk);
    tcp_done(sk);

    if (!sock_flag(sk, SOCK_DEAD))
        sk_error_report(sk);
}
EXPORT_SYMBOL(tcp_done_with_error);

/* When we get a reset we do this. */
void tcp_reset(struct sock *sk, struct sk_buff *skb)
{
    int err;

    trace_tcp_receive_reset(sk);

    /* mptcp can't tell us to ignore reset pkts,
     * so just ignore the return value of mptcp_incoming_options().
     */
    if (sk_is_mptcp(sk))
        mptcp_incoming_options(sk, skb);

    /* We want the right error as BSD sees it (and indeed as we do). */
    switch (sk->sk_state) {
    case TCP_SYN_SENT:
        err = ECONNREFUSED;
        break;
    case TCP_CLOSE_WAIT:
        err = EPIPE;
        break;
    case TCP_CLOSE:
        return;
    default:
        err = ECONNRESET;
    }
    tcp_done_with_error(sk, err);
}

所以:

  1. socket 先收到 FIN 包,状态变为 TCP_CLOSE_WAIT
  2. socket 再收到 FINACK 包,缓存到 sk->sk_receive_queue
  3. socket 最后收到 RST 包,状态变为 TCP_CLOSE

3. sarama 存在 tcp 连接泄漏问题

在跟同事讨论后,确认:9093 是 Kafka 服务端的端口,而且用来连接 Kafka 的库是 sarama。

查看一下 sarama 的 issue 列表,已经有人提了相关问题:

但是,该问题有待进一步排查。

3.1. 给 sarama 加更多 log

直接修改 sarama 代码:

  1. 在 broker 里涉及 tcp 连接的地方加 log。
  2. 在 client 里涉及 broker 的地方加 log。
  3. 在用户态进程里开启 sarama 的 logger。
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
diff --git a/broker.go b/broker.go
index d0d5b87..9cc4ca2 100644
--- a/broker.go
+++ b/broker.go
@@ -155,6 +155,36 @@ func NewBroker(addr string) *Broker {
    return &Broker{id: -1, addr: addr}
 }

+func (b *Broker) connInfo(conn net.Conn) string {
+   if conn == nil {
+       return "not connected"
+   }
+
+   laddr, raddr := conn.LocalAddr(), conn.RemoteAddr()
+   return fmt.Sprintf("laddr=%s raddr=%s", laddr.String(), raddr.String())
+}
+
+func (b *Broker) debugInfo() string {
+   var sb strings.Builder
+
+   fmt.Fprintf(&sb, "id=0x%x addr=%s %s", b.id, b.addr, b.connInfo(b.conn))
+
+   return sb.String()
+}

 // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
 // waiting for the connection to complete. This means that any subsequent operations on the broker will
 // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
@@ -202,16 +232,26 @@ func (b *Broker) Open(conf *Config) error {
        }()
        dialer := conf.getDialer()
        b.conn, b.connErr = dialer.Dial("tcp", b.addr)
+       Logger.Printf("[DBG] [BROKER] connecting conn[%s] err[%v]\n", b.connInfo(b.conn), b.connErr)
        if b.connErr != nil {
            Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
            b.conn = nil
            atomic.StoreInt32(&b.opened, 0)
            return
        }
        if conf.Net.TLS.Enable {
            b.conn = tls.Client(b.conn, validServerNameTLS(b.addr, conf.Net.TLS.Config))
        }

+       Logger.Printf("[DBG] [BROKER] connected %s\n", b.debugInfo())
+
        b.conn = newBufConn(b.conn)
        b.conf = conf

@@ -241,7 +281,7 @@ func (b *Broker) Open(conf *Config) error {
            b.connErr = b.authenticateViaSASLv0()

            if b.connErr != nil {
-               err = b.conn.Close()
+               err = b.closeConn()
                if err == nil {
                    DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
                } else {
@@ -262,7 +302,7 @@ func (b *Broker) Open(conf *Config) error {
            if b.connErr != nil {
                close(b.responses)
                <-b.done
-               err = b.conn.Close()
+               err = b.closeConn()
                if err == nil {
                    DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
                } else {
@@ -278,6 +318,8 @@ func (b *Broker) Open(conf *Config) error {
        } else {
            DebugLogger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
        }
+
+       Logger.Printf("[DBG] [BROKER] opened %s\n", b.debugInfo())
    })

    return nil
@@ -290,6 +332,13 @@ func (b *Broker) ResponseSize() int {
    return len(b.responses)
 }

+func (b *Broker) closeConn() error {
+   dbgInfo := b.debugInfo()
+   err := b.conn.Close()
+   Logger.Printf("[DBG] [BROKER] closed %s (err=%v)\n", dbgInfo, err)
+   return err
+}
+
 // Connected returns true if the broker is connected and false otherwise. If the broker is not
 // connected but it had tried to connect, the error from that connection attempt is also returned.
 func (b *Broker) Connected() (bool, error) {
@@ -329,7 +378,7 @@ func (b *Broker) Close() error {
    close(b.responses)
    <-b.done

-   err := b.conn.Close()
+   err := b.closeConn()

    b.conn = nil
    b.connErr = nil
diff --git a/client.go b/client.go
index 2decba7..20644b6 100644
--- a/client.go
+++ b/client.go
@@ -3,6 +3,8 @@ package sarama
 import (
    "context"
    "errors"
+   "fmt"
+   "io"
    "math"
    "math/rand"
    "net"
@@ -236,6 +238,43 @@ func (client *client) Config() *Config {
    return client.conf
 }

+func (client *client) printAllBrokersInfo(sb *strings.Builder) {
+   sb.WriteString("[DBG] [CLIENT] ALL brokers:\n")
+
+   sb.WriteString("[DBG] [CLIENT] Seed brokers:\n")
+   for i, broker := range client.seedBrokers {
+       if broker == nil {
+           fmt.Fprintf(sb, "  [DBG] [CLIENT] seed broker #%d: <nil>\n", i)
+       } else {
+           fmt.Fprintf(sb, "  [DBG] [CLIENT] seed broker #%d: %s\n", i, broker.debugInfo())
+       }
+   }
+
+   sb.WriteString("[DBG] [CLIENT] Dead seed brokers:\n")
+   for i, broker := range client.deadSeeds {
+       if broker == nil {
+           fmt.Fprintf(sb, "  [DBG] [CLIENT] dead seed broker #%d: <nil>\n", i)
+       } else {
+           fmt.Fprintf(sb, "  [DBG] [CLIENT] dead seed broker #%d: %s\n", i, broker.debugInfo())
+       }
+   }
+
+   sb.WriteString("[DBG] [CLIENT] Registered brokers:\n")
+   client.printBrokersInfo(sb)
+
+   Logger.Printf(sb.String())
+}
+
+func (client *client) printBrokersInfo(w io.Writer) {
+   for id, broker := range client.brokers {
+       if broker == nil {
+           fmt.Fprintf(w, "  [DBG] [CLIENT] broker id#%d: <nil>\n", id)
+       } else {
+           fmt.Fprintf(w, "  [DBG] [CLIENT] broker id#%d: %s\n", id, broker.debugInfo())
+       }
+   }
+}
+
 func (client *client) Brokers() []*Broker {
    client.lock.RLock()
    defer client.lock.RUnlock()
@@ -243,6 +282,11 @@ func (client *client) Brokers() []*Broker {
    for _, broker := range client.brokers {
        brokers = append(brokers, broker)
    }
+
+   var sb strings.Builder
+   sb.WriteString("[DBG] [CLIENT] copied current brokers:\n")
+   client.printBrokersInfo(&sb)
+   Logger.Printf(sb.String())
    return brokers
 }

@@ -308,6 +352,15 @@ func (client *client) Close() error {
    defer client.lock.Unlock()
    DebugLogger.Println("Closing Client")

+   var sb strings.Builder
+   sb.WriteString("[DBG] [CLIENT] Closing brokers:\n")
+   client.printAllBrokersInfo(&sb)
+   defer func() {
+       sb.WriteString("[DBG] [CLIENT] After closing brokers:\n")
+       client.printAllBrokersInfo(&sb)
+       Logger.Printf(sb.String())
+   }()
+
    for _, broker := range client.brokers {
        safeAsyncClose(broker)
    }
@@ -518,6 +571,15 @@ func (client *client) RefreshBrokers(addrs []string) error {
    client.lock.Lock()
    defer client.lock.Unlock()

+   var sb strings.Builder
+   sb.WriteString("[DBG] [CLIENT] Refreshing brokers, closing existing brokers:\n")
+   client.printAllBrokersInfo(&sb)
+   defer func() {
+       sb.WriteString("[DBG] [CLIENT] After refreshing brokers:\n")
+       client.printAllBrokersInfo(&sb)
+       Logger.Printf(sb.String())
+   }()
+
    for _, broker := range client.brokers {
        safeAsyncClose(broker)
    }
@@ -608,6 +670,7 @@ func (client *client) deregisterController() {
    if controller, ok := client.brokers[client.controllerID]; ok {
        _ = controller.Close()
        delete(client.brokers, client.controllerID)
+       Logger.Printf("[DBG] [CLIENT] deregistered controller broker #%d at %s", controller.ID(), controller.debugInfo())
    }
 }

@@ -723,6 +786,18 @@ func (client *client) randomizeSeedBrokers(addrs []string) {
 func (client *client) updateBroker(brokers []*Broker) {
    currentBroker := make(map[int32]*Broker, len(brokers))

+   var sb strings.Builder
+   sb.WriteString("[DBG] [CLIENT] Updating brokers with the following list:\n")
+   for _, broker := range brokers {
+       sb.WriteString(fmt.Sprintf("  [DBG] [CLIENT] broker to update: %s\n", broker.debugInfo()))
+   }
+   client.printAllBrokersInfo(&sb)
+   defer func() {
+       sb.WriteString("[DBG] [CLIENT] After updating brokers:\n")
+       client.printAllBrokersInfo(&sb)
+       Logger.Printf(sb.String())
+   }()
+
    for _, broker := range brokers {
        currentBroker[broker.ID()] = broker
        if client.brokers[broker.ID()] == nil { // add new broker
@@ -753,6 +828,16 @@ func (client *client) registerBroker(broker *Broker) {
        return
    }

+   var sb strings.Builder
+   sb.WriteString("[DBG] [CLIENT] Registering broker:\n")
+   sb.WriteString(fmt.Sprintf("  [DBG] [CLIENT] broker to register: %s\n", broker.debugInfo()))
+   client.printAllBrokersInfo(&sb)
+   defer func() {
+       sb.WriteString("[DBG] [CLIENT] After registering broker, current brokers:\n")
+       client.printAllBrokersInfo(&sb)
+       Logger.Printf(sb.String())
+   }()
+
    if client.brokers[broker.ID()] == nil {
        client.brokers[broker.ID()] = broker
        DebugLogger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
@@ -769,6 +854,16 @@ func (client *client) deregisterBroker(broker *Broker) {
    client.lock.Lock()
    defer client.lock.Unlock()

+   var sb strings.Builder
+   sb.WriteString("[DBG] [CLIENT] Deregistering broker:\n")
+   sb.WriteString(fmt.Sprintf("  [DBG] [CLIENT] broker to deregister: %s\n", broker.debugInfo()))
+   client.printAllBrokersInfo(&sb)
+   defer func() {
+       sb.WriteString("[DBG] [CLIENT] After deregistering broker, current brokers:\n")
+       client.printAllBrokersInfo(&sb)
+       Logger.Printf(sb.String())
+   }()
+
    _, ok := client.brokers[broker.ID()]
    if ok {
        Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
@@ -788,6 +883,11 @@ func (client *client) resurrectDeadBrokers() {
    Logger.Printf("client/brokers resurrecting %d dead seed brokers", len(client.deadSeeds))
    client.seedBrokers = append(client.seedBrokers, client.deadSeeds...)
    client.deadSeeds = nil
+
+   var sb strings.Builder
+   sb.WriteString("[DBG] [CLIENT] After resurrecting dead seed brokers, current brokers:\n")
+   client.printAllBrokersInfo(&sb)
+   Logger.Printf(sb.String())
 }

 // LeastLoadedBroker returns the broker with the least pending requests.
@@ -796,6 +896,15 @@ func (client *client) LeastLoadedBroker() *Broker {
    client.lock.RLock()
    defer client.lock.RUnlock()

+   var sb strings.Builder
+   sb.WriteString("[DBG] [CLIENT] Checking least loaded broker among current brokers:\n")
+   client.printBrokersInfo(&sb)
+   defer func() {
+       sb.WriteString("[DBG] [CLIENT] Finished checking least loaded broker, current brokers\n")
+       client.printBrokersInfo(&sb)
+       Logger.Printf(sb.String())
+   }()
+
    var leastLoadedBroker *Broker
    pendingRequests := math.MaxInt
    for _, broker := range client.brokers {

通过上面的内核模块确认:

  1. seed broker tcp 连接已变成 TCP_CLOSE 状态了,但用户态没关掉。
  2. registered broker tcp 连接存在同样问题。

同时观察到有以下日志:

1
client/metadata got error from broker 1193 while fetching metadata: write tcp 10.x.x.x:24180->10.y.y.y.y:9093: write: broken pipe

这条日志便能解释上面 dmesg 日志减少、甚至消失的原因了:sarama 里随机抽取 broker 来发送请求,而这些 broker 的 tcp 连接已经不可用了,导致写数据时内核返回 -EPIPE 错误,sarama 便会关闭该 broker 的 tcp 连接。

但是,为什么 Kafka 服务端会主动关闭 tcp 连接呢?有待进一步研究。

3.2. 修复 sarama

在合适的地方,检查 tcp 连接对应的 socket 是否有 sk->err

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
func (b *Broker) getSockOpt(conn *net.TCPConn, opt int) (int, error) {
    file, err := conn.File()
    if err != nil {
        return 0, fmt.Errorf("failed to get file from connection: %w", err)
    }
    defer file.Close() // Close the duplicated file descriptor

    sockErr, err := syscall.GetsockoptInt(int(file.Fd()), syscall.SOL_SOCKET, opt)
    if err != nil {
        return 0, fmt.Errorf("failed to get sockopt %d: %w", opt, err)
    }
    return sockErr, nil
}

func (b *Broker) getSockError() error {
    b.lock.Lock()
    defer b.lock.Unlock()
    if b.connTCP == nil {
        return nil
    }

    sockErr, err := b.getSockOpt(b.connTCP, syscall.SO_ERROR)
    if err != nil {
        return fmt.Errorf("failed to get socket error: %w", err)
    }
    if sockErr != 0 {
        return syscall.Errno(sockErr)
    }
    return nil
}

func (client *client) checkSeedBrokersHealth(brokers []*Broker) []*Broker {
    if len(brokers) == 0 {
        return nil
    }

    healthyBrokers := make([]*Broker, 0, len(brokers))
    for _, broker := range brokers {
        if err := broker.getSockError(); err != nil {
            Logger.Printf("client/seedbrokers close seed broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err)
            safeAsyncClose(broker)
            continue
        }

        healthyBrokers = append(healthyBrokers, broker)
    }

    return healthyBrokers
}

func (client *client) checkBrokersHealth() {
    for id, broker := range client.brokers {
        if err := broker.getSockError(); err != nil {
            Logger.Printf("client/brokers close broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err)
            safeAsyncClose(broker)
            delete(client.brokers, id)
        }
    }

    client.seedBrokers = client.checkSeedBrokersHealth(client.seedBrokers)
    client.deadSeeds = client.checkSeedBrokersHealth(client.deadSeeds)
}

func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
    if client.Closed() {
        return
    }
    client.lock.Lock()
    defer client.lock.Unlock()

    // Check health of existing brokers, including seed brokers, dead
    // seed brokers, and registered brokers.
    // - if error occurred on broker's tcp socket, close the tcp
    //   connection.
    // - if it's seed broker or dead seed broker, remove it from
    //   the list.
    client.checkBrokersHealth()
}

updateMetadata() 加了 checkBrokersHealth() 后,sarama 每 10 分钟就会关闭已经处于 TCP_CLOSE 状态的 tcp 连接。

通过上面的内核模块确认,只有少量的 tcp socket 处于 TCP_CLOSE,而且会在 10 分钟内被关闭掉。

更多修复细节,请查看 sarama PR:

4. 改进内核协议栈

  1. 如果无法控制用户态行为,是否可以修改内核让系统管理员有办法处理这样的问题呢?
  2. 是否可以减少 dmesg 的日志?甚至完全消除掉?

4.1. 计划新增 net.ipv4.tcp_purge_receive_queue sysctl

核心改动如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
diff --git a/net/ipv4/tcp_input.c b/net/ipv4/tcp_input.c
index 198f8a0d37be0..a01d39c37be2f 100644
--- a/net/ipv4/tcp_input.c
+++ b/net/ipv4/tcp_input.c
@@ -4648,6 +4648,7 @@ EXPORT_IPV6_MOD(tcp_done_with_error);
 /* When we get a reset we do this. */
 void tcp_reset(struct sock *sk, struct sk_buff *skb)
 {
+   const struct net *net = sock_net(sk);
     int err;

     trace_tcp_receive_reset(sk);
@@ -4664,6 +4665,21 @@ void tcp_reset(struct sock *sk, struct sk_buff *skb)
         err = ECONNREFUSED;
         break;
     case TCP_CLOSE_WAIT:
+       /* RFC9293 3.10.7.4. Other States
+        *   Second, check the RST bit:
+        *     CLOSE-WAIT STATE
+        *
+        * If the RST bit is set, then any outstanding RECEIVEs and
+        * SEND should receive "reset" responses.  All segment queues
+        * should be flushed.  Users should also receive an unsolicited
+        * general "connection reset" signal.  Enter the CLOSED state,
+        * delete the TCB, and return.
+        *
+        * If net.ipv4.tcp_purge_receive_queue is enabled,
+        * sk_receive_queue will be flushed too.
+        */
+       if (unlikely(net->ipv4.sysctl_tcp_purge_receive_queue))
+           skb_queue_purge(&sk->sk_receive_queue);
         err = EPIPE;
         break;
     case TCP_CLOSE:

没错,如此改动的参考依据是 RFC9293 的 3.10.7.4. 小节。

4.2. 计划新增 page_pool_release_stalled tracepoint

核心改动如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
diff --git a/net/core/page_pool.c b/net/core/page_pool.c
index 1a5edec485f1..59a85887921c 100644
--- a/net/core/page_pool.c
+++ b/net/core/page_pool.c
@@ -1218,8 +1218,11 @@ static void page_pool_release_retry(struct work_struct *wq)
            (!netdev || netdev == NET_PTR_POISON)) {
                int sec = (s32)((u32)jiffies - (u32)pool->defer_start) / HZ;

-               pr_warn("%s() stalled pool shutdown: id %u, %d inflight %d sec\n",
-                       __func__, pool->user.id, inflight, sec);
+               if (sec >= DEFER_WARN_INTERVAL / HZ && sec < DEFER_WARN_INTERVAL * 2 / HZ)
+                       pr_warn("%s() stalled pool shutdown: id %u, %d inflight %d sec\n",
+                               __func__, pool->user.id, inflight, sec);
+               else
+                       trace_page_pool_release_stalled(pool, inflight, sec);
                pool->defer_warn = jiffies + DEFER_WARN_INTERVAL;
        }

如此改动的出发点是:

  1. 至少有一次 pr_warn()
  2. 后续通过 page_pool_release_stalled tracepoint 确认是否还有 inflight page。

或许有更好的改进办法,有待进一步研究。

5. 小结

首先,感谢以下大佬在我排查问题的过程中热情地参与讨论:

  • Jason Xing
  • Lance Yang
  • Jiayuan Chen
  • Gray Liang

接着,bpfsnoop 在排查问题时发挥了关键作用,不用写比较复杂的 bpftrace 脚本。

在确认协议栈的行为后,用 AI 快速编写了 Python 脚本,用来快速复现协议栈的行为。

最后,等 sarama PR 合并之后,请更新 sarama 到最新的 commit,避免 tcp 连接泄漏问题。