Like的世界

个人总结与随想

用ACE实现One Loop Per Thread

| Comments

one loop per thread 即每个线程运行一个独立的事件循环 (event loop),或称为 one event loop per thread 更准确。而且事件循环线程之间通常不存在同步或互斥的关系, 并发连接的处理能力随CPU核心数而扩展 (scalable)。

ACE 拥有多种实现的Reactor:

  • ACE_Select_Reactor : 基于 select 的实现。
  • ACE_Dev_Poll_Reactor : 基于 Linux epoll 或 BSD /dev/poll 的实现。
  • ACE_WFMO_Reactor : 基于 Windows WaitForMultipleObjects 的实现

它们都可用于实现 one loop per thread 模式。 相比 ACE_TP_Reactor

  • ACE_TP_Reactor 继承于 ACE_Select_Reactor加锁 保证多线程竞争同一 Reactor 的安全。并行可扩展能力受限,且最大支持 1024 个描述符。
  • 基于 ACE_Dev_Poll_Reactor 的 one loop er thread,每个线程拥有独立的 Reactor,线程之间不存在竞争。充分发挥并行能力,且最大支持几十万甚至百万个描述符。

以下以 ACE_Select_Reactor 为例浅析实现的关键代码。更完备示例代码请参看我的 ace_echod

实现浅析

事件循环函数

event_loop 将作为线程函数运行于独立线程中,它与 ACE 教科书基本一样:

1
2
3
4
5
6
7
8
9
ACE_THR_FUNC_RETURN event_loop(ACE_Reactor* reactor)
{
    reactor->owner(ACE_OS::thr_self()); // 将reactor “绑定” 本线程。
    while (!reactor->reactor_event_loop_done()) {
        reactor->run_reactor_event_loop();
    }

    return 0;
}

事件循环管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Event_Loop_Manager {
public:
    // 为简化,忽略 ctor 、dtor 和 close

    bool open(unsigned threads)
    {
        // 创建Reactor数组。
        reactors_.reset(new ACE_Reactor*[threads]);
        memset(reactors_.get(), 0, sizeof(ACE_Reactor*) * threads);

        // 创建线程ID数组
        tids_.reset(new ACE_thread_t[threads]);
        memset(tids_.get(), 0, sizeof(ACE_thread_t*) * threads);

        // 创建事件循环
        threads_ = threads;
        for (unsigned i = 0; i < threads_; ++i) {
            reactors_[i] = new ACE_Reactor(new ACE_Select_Reactor, 1), false);
            ACE_Thread_Manager::instance()->spawn(
                    (ACE_THR_FUNC) event_loop, reactors[i],
                    THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, &tid);
        }

        current_ = 0;
        return true;
    }

    /// 以简单循环方式返回 Reactor
    ACE_Reactor* reactor()
    {
        ACE_Reactor* r = reactors_[current_];
        current_ = (current_ + 1) % threads_;
        return r;
    }
protected:
    ACE_Auto_Basic_Array_Ptr<ACE_Reactor*> reactors_;
    ACE_Auto_Basic_Array_Ptr<ACE_thread_t> tids_;
    unsigned threads_;
    unsigned current_; // 当前分配 Reactor 索引
};

Echo Acceptor

据 ACE 的 Acceptor 模式, Echo_Handler 继承于 ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>,它与 ACE 教科书实现相似。

  • Echo_Acceptor 注册默认 Reactor ,其事件循环运行于主线程。
  • 创建 Echo_Handler 对象时,事件循环管理器以简单循环方式分配 Reactor 给新对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Echo_Acceptor: public ACE_Acceptor<Echo_Handler, ACE_SOCK_ACCEPTOR> {
public:
    int open(const addr_type& local_addr, int flags, int use_select, int reuse_addr)
    {
        flags_ = flags;
        use_select_ = use_select;
        reuse_addr_ = reuse_addr;
        peer_acceptor_addr_ = local_addr;

        if (peer_acceptor_.open(local_addr, reuse_addr) == -1)
            return -1;

        (void) peer_acceptor_.enable(ACE_NONBLOCK);
        ACE_Reactor::instance()->register_handler(
            this, ACE_Event_Handler::ACCEPT_MASK);

        this->reactor(ACE_Reactor::instance());

        // 按逻辑CPU数初始化事件循环(线程)数。
        return loops_.open(reactor_type, ACE_OS::num_processors()) ? 0 : -1;
    }

    virtual int make_svc_handler(handler_type*& sh)
    {
        if (!sh)
            ACE_NEW_RETURN(sh, handler_type, -1);

        // 事件循环管理器给 Echo_Handler 对象分配事件循环。
        sh->reactor(loops_.reactor());
        return 0;
    }

protected:
    Event_Loop_Manager loops_;
};

存在问题

  • ACE_Reactor::size 实现不一致,除 ACE_Dev_Poll_Reactor 返回当前注册的 handler 数量,其它实现返回handler表的容量。导致无法直接通过注册 handler 的数量,实现 Round-Robin,即均衡分配 handler
  • ACE_Reactor 无法获取当前是否等待事件(即空闲)状态,导致无法直接将 handler 分配到空闲事件循环中。

解决上述问题,要么直接修改 ACE 的实现,要么在具体 handler 实现中增加状态或统计变量,两者都需用原子变量来避免加锁。

Comments