存档

2015年8月 的存档

Ceph 深入解析(1) — Ceph 的消息处理架构

2015年8月14日 2 条评论

Ceph 的消息处理主要关联到以下几个类:

消息处理架构

架构上采用 Publish/subscribe(发布/订阅) 的设计模式.

模块说明

class Messenger

该类作为消息的发布者, 各个 Dispatcher 子类作为消息的订阅者, Messenger 收到消息之后,通过 Pipe 读取消息,然后转给 Dispatcher 处理

class SimpleMessenger

Messenger 接口的实现

class Dispatcher

该类是订阅者的基类,具体的订阅后端继承该类,初始化的时候通过 Messenger::add_dispatcher_tail/head 注册到 Messenger::dispatchers. 收到消息后,通知改类处理

class Accepter

监听 peer 的请求, 有新请求时, 调用 SimpleMessenger::add_accept_pipe() 创建新的 Pipe 到 SimpleMessenger::pipes 来处理该请求

class Pipe

用于消息的读取和发送,该类主要有两个组件,Pipe::Reader 和 Pipe::Writer, 分别用来处理 消息的读取和发送. 这两个类都是 class Thread 的子类,意味这每次处理消息都会有两个 线程被分别创建.

消息被 Pipe::Reader 读取后,该线程会通知注册到 Messenger::dispatchers 中的某一个 Dispatcher(如 Monitor) 处理, 处理完成之后将回复的消息放到 SimpleMessenger::Pipe::out_q 中,供 Pipe::Writer 来处理发送

class DispatchQueue

该类用来缓存收到的消息, 然后唤醒 DispatchQueue::dispatch_thread 线程找到后端的 Dispatch 处理消息

深入解析

流程

下面的代码涉及到的订阅子类以 Monitor 为例:

初始化

int main(int argc, char *argv[])
{
    // 创建一个 Messenger 对象,由于 Messenger 是抽象类,不能直接实例化,提供了一个
    // ::create 的方法来创建子类,目前 Ceph 所有模块使用 SimpleMessenger
    Messenger *messenger = Messenger::create(g_ceph_context,
                                             entity_name_t::MON(rank),
                                             "mon",
                                             0);
 
    /**
     * 执行 socket() -> bind() -> listen() 等一系列动作, 执行流程如下:
     SimpleMessenger::bind()
         --> Accepter::bind()
             socket() -> bind() -> listen()
    */
    err = messenger->bind(ipaddr);
 
    // 创建一个 Dispatch 的子类对象, 这里是 Monitor
    mon = new Monitor(g_ceph_context, g_conf->name.get_id(), store, 
                      messenger, &monmap);
 
    // 启动 Reaper 线程
    messenger->start();
 
    /**
     * a). 初始化 Monitor 模块
     * b). 通过 SimpleMessenger::add_dispatcher_tail() 注册自己到
     * SimpleMessenger::dispatchers 中, 流程如下:
     * Messenger::add_dispatcher_tail()
     *      --> ready()
     *        --> dispatch_queue.start()(新 DispatchQueue 线程)
              --> Accepter::start()(启动start线程)
     *            --> accept
     *                --> SimpleMessenger::add_accept_pipe
     *                    --> Pipe::start_reader
     *                        --> Pipe::reader()
     * 在 ready() 中: 通过 Messenger::reader(),
     * 1) DispatchQueue 线程会被启动,用于缓存收到的消息消息
     * 2) Accepter 线程启动,开始监听新的连接请求.
     */
    mon->init();
 
    // 进入 mainloop, 等待退出
    messenger->wait();
    return 0;
}

消息处理

收到连接请求

请求的监听和处理由 SimpleMessenger::ready –> Accepter::entry 实现

void SimpleMessenger::ready()
{
    // 启动 DispatchQueue 线程
    dispatch_queue.start();
 
    lock.Lock();
    // 启动 Accepter 线程监听客户端连接, 见下面的 Accepter::entry
    if (did_bind)
        accepter.start();
    lock.Unlock();
}
 
void *Accepter::entry()
{
    struct pollfd pfd;
    // listen_sd 是 Accepter::bind() 中创建绑定的 socket
    pfd.fd = listen_sd;
    pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
    while (!done) {
        int r = poll(&pfd, 1, -1);
        if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))
            break;
        if (done) break;
        entity_addr_t addr;
        socklen_t slen = sizeof(addr.ss_addr());
        int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
        if (sd >= 0) {
            // 调用 SimpleMessenger::add_accept_pipe() 处理这个连接
            msgr->add_accept_pipe(sd);
        } 
    }
    return 0;
}

随后创建 Pipe() 开始消息的处理

Pipe *SimpleMessenger::add_accept_pipe(int sd)
{
    lock.Lock();
    Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
    p->sd = sd;
    p->pipe_lock.Lock();
    // 
    /**
     * 调用 Pipe::start_reader() 开始读取消息, 将会创建一个读线程开始处理.
     * Pipe::start_reader() --> Pipe::reader
     */
    p->start_reader();
    p->pipe_lock.Unlock();
    pipes.insert(p);
    accepting_pipes.insert(p);
    lock.Unlock();
    return p;
}

创建消息读取和发送线程

处理消息由 Pipe::start_reader() –> Pipe::reader() 开始,此时已经是在 Reader 线程中. 首先会调用 accept() 做一些简答的处理然后创建 Writer() 线程,等待发送回复 消息. 然后读取消息, 读取完成之后, 将收到的消息封装在 Message 中,交由 dispatch_queue() 处理.

dispatch_queue() 找到注册者,将消息转交给它处理,处理完成唤醒 Writer() 线程发送回复消息.

void Pipe::reader()
{
    /**
     * Pipe::accept() 会调用 Pipe::start_writer() 创建 wirter 线程, 进入 writer 线程
     * 后,会 cond.Wait() 等待被激活,激活的流程看下面的说明. Writer 线程的创建见后后面
     * Pipe::accept() 的分析
     */
    if (state == STATE_ACCEPTING) {
        accept();
    }
 
    while (state != STATE_CLOSED &&
           state != STATE_CONNECTING) {
        // 读取消息类型,某些消息会马上激活 writer 线程先处理
        if (tcp_read((char*)&tag, 1) < 0) {
            continue;
        }
        if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
            continue;
        }
        if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
            continue;
        }
        if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
            continue;
        }
        if (tag == CEPH_MSGR_TAG_ACK) {
            continue;
        }
        else if (tag == CEPH_MSGR_TAG_MSG) {
            // 收到 MSG 消息
            Message *m = 0;
            // 将消息读取到 new 到的 Message 对象
            int r = read_message(&m, auth_handler.get());
 
            // 先激活 writer 线程 ACK 这个消息
            cond.Signal();  // wake up writer, to ack this
 
            // 如果该次请求是可以延迟处理的请求,将 msg 放到 Pipe::DelayedDelivery::delay_queue, 
            // 后面通过相关模块再处理
            // 注意,一般来讲收到的消息分为三类:
            // 1. 直接可以在 reader 线程中处理,如上面的 CEPH_MSGR_TAG_ACK
            // 2. 正常处理, 需要将消息放入 DispatchQueue 中,由后端注册的消息处理,然后唤醒发送线程发送
            // 3. 延迟发送, 下面的这种消息, 由定时时间决定什么时候发送
            if (delay_thread) {
                utime_t release;
                if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
                    release = m->get_recv_stamp();
                    release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
                    lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
                }
                delay_thread->queue(release, m);
            } else {
                // 正常处理的消息,放到 Pipe::DispatchQueue *in_q 中, 以下是整个消息的流程
                // DispatchQueue::enqueue()
                //     --> mqueue.enqueue() -> cond.Signal()(激活唤醒 DispatchQueue::dispatch_thread 线程)
                //         --> DispatchQueue::dispatch_thread::entry() 该线程得到唤醒
                //             --> Messenger::ms_deliver_XXX
                //                 --> 具体的 Dispatch 实例, 如 Monitor::ms_dispatch()
                //                     --> Messenger::send_message()
                //                         --> SimpleMessenger::submit_message()
                //                             --> Pipe::_send()
                //                                 --> Pipe::out_q[].push_back(m) -> cond.Signal 激活 writer 线程
                //                                     --> ::sendmsg() // 发送到 socket
                in_q->enqueue(m, m->get_priority(), conn_id);
            }
        } 
 
        else if (tag == CEPH_MSGR_TAG_CLOSE) {
            cond.Signal();
            break;
        }
        else {
            ldout(msgr->cct,0) << "reader bad tag " << (int)tag << dendl;
            pipe_lock.Lock();
            fault(true);
        }
    }
}

Pipe::accept() 做一些简单的协议检查和认证处理,之后创建 Writer() 线程: Pipe::start_writer() –> Pipe::Writer

int Pipe::accept()
{
    ldout(msgr->cct,10) << "accept" << dendl;
    // 检查自己和对方的协议版本等信息是否一致等操作
    // ......
 
    while (1) {
        // 协议检查等操作
        // ......
 
        /**
         * 通知注册者有新的 accept 请求过来,如果 Dispatcher 的子类有实现
         * Dispatcher::ms_handle_accept(),则会调用该方法处理
         */
        msgr->dispatch_queue.queue_accept(connection_state.get());
 
        // 发送 reply 和认证相关的消息
        // ......
 
        if (state != STATE_CLOSED) {
            /**
             * 前面的协议检查,认证等都完成之后,开始创建 Writer() 线程等待注册者
             * 处理完消息之后发送
             * 
             */
            start_writer();
        }
        ldout(msgr->cct,20) << "accept done" << dendl;
 
        /**
         * 如果该消息是延迟发送的消息, 且相关的发送线程没有启动,启动之
         * Pipe::maybe_start_delay_thread()
         *     --> Pipe::DelayedDelivery::entry()
         */
        maybe_start_delay_thread();
        return 0;   // success.
    }
}

随后 Writer 线程等待被唤醒发送回复消息

void Pipe::writer()
{
    while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
        if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
            (is_queued() || in_seq > in_seq_acked)) {
 
            // 对 keepalive, keepalive2, ack 包的处理
            // ......
 
            // 从 Pipe::out_q 中得到一个取出包准备发送
            Message *m = _get_next_outgoing();
            if (m) {
                // 对包进行一些加密处理
                m->encode(features, !msgr->cct->_conf->ms_nocrc);
 
                // 包头
                ceph_msg_header& header = m->get_header();
                ceph_msg_footer& footer = m->get_footer();
 
                // 取出要发送的二进制数据
                bufferlist blist = m->get_payload();
                blist.append(m->get_middle());
                blist.append(m->get_data());
 
                // 发送包: Pipe::write_message() --> Pipe::do_sendmsg --> ::sendmsg()
                ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl;
                int rc = write_message(header, footer, blist);
                m->put();
            }
            continue;
        }
 
        // 等待被 Reader 或者 Dispatcher 唤醒
        ldout(msgr->cct,20) << "writer sleeping" << dendl;
        cond.Wait(pipe_lock);
    }
}

消息的处理

Reader 线程将消息交给 dispatch_queue 处理,流程如下:

Pipe::reader() –> Pipe::in_q->enqueue()

void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
{
    Mutex::Locker l(lock);
    ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
    add_arrival(m);
    // 将消息按优先级放入 DispatchQueue::mqueue 中
    if (priority >= CEPH_MSG_PRIO_LOW) {
        mqueue.enqueue_strict(
            id, priority, QueueItem(m));
    } else {
        mqueue.enqueue(
            id, priority, m->get_cost(), QueueItem(m));
    }
    // 唤醒 DispatchQueue::entry() 处理消息
    cond.Signal();
}
 
void DispatchQueue::entry()
{
    while (true) {
        while (!mqueue.empty()) {
            QueueItem qitem = mqueue.dequeue();
            Message *m = qitem.get_message();
            /**
             * 交给 Messenger::ms_deliver_dispatch() 处理,后者会找到
             * Monitor/OSD 等的 ms_deliver_dispatch() 开始对消息的逻辑处理
             * Messenger::ms_deliver_dispatch()
             *     --> Monitor::ms_dispatch()
             */
            msgr->ms_deliver_dispatch(m);
        }
        if (stop)
            break;
 
        // 等待被 DispatchQueue::enqueue() 唤醒
        cond.Wait(lock);
    }
    lock.Unlock();
}

下面简单看一下在订阅者的模块中消息是怎样被放入 Pipe::out_q 中的:

Messenger::ms_deliver_dispatch()
    --> Monitor::ms_dispatch()
        --> Monitor::_ms_dispatch
            --> Monitor::dispatch
                --> Monitor::handle_mon_get_map
                    --> Monitor::send_latest_monmap
                        --> SimpleMessenger::send_message()
                            --> SimpleMessenger::_send_message()
                                --> SimpleMessenger::submit_message()
                                    --> Pipe::_send()
bool Monitor::_ms_dispatch(Message *m)
{
    ret = dispatch(s, m, src_is_mon);
 
    if (s) {
        s->put();
    }
 
    return ret;
}
 
bool Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon)
{
    switch (m->get_type()) {
    case CEPH_MSG_MON_GET_MAP:
        handle_mon_get_map(static_cast<MMonGetMap*>(m));
        break;
    // ......
    default:
        ret = false;
    }
    return ret;
}
 
void Monitor::handle_mon_get_map(MMonGetMap *m)
{
    send_latest_monmap(m->get_connection().get());
    m->put();
}
 
void Monitor::send_latest_monmap(Connection *con)
{
    bufferlist bl;
    monmap->encode(bl, con->get_features());
    /**
     * SimpleMessenger::send_message()
     *     --> SimpleMessenger::_send_message()
     *         --> SimpleMessenger::submit_message()
     *             --> Pipe::_send()
     */
    messenger->send_message(new MMonMap(bl), con);
}
 
void Pipe::_send(Message *m)
{
    assert(pipe_lock.is_locked());
    out_q[m->get_priority()].push_back(m);
    // 唤醒 Writer 线程
    cond.Signal();
}

总结

由上面的所有分析,除了订阅者/发布者设计模式,对网络包的处理上采用的是古老的 生产者消费者问题 线程模型,每次新的请求就会有创建一对收/发线程用来处理消息的接受 发送,如果有大规模的请求,线程的上下文切换会带来大量的开销,性能可能产生瓶颈。

不过在较新的 Ceph 版本中,新增加了两种新的消息模型: AsyncMessenger 和 XioMessenger 让 Ceph 消息处理得到改善.

相关的评测以后带来

分类: Storage 标签:

RadosGW 初体验

2015年8月3日 没有评论

介绍

Ceph 的最底层模块是对象存储,本质上就是一个 Rados, 如果不考虑用户体验,利用 rados 就可以使用 Ceph 对象存储. 例如:

$ rados -p test_pool get SI7W1FUI43H3I3ED9CWX test.data
  • test_pool: pool 名
  • SI7W1FUI43H3I3ED9CWX: object 名
  • test.data 保存到这里

和 RBD 以及 CephMDS 一样,RadosGW 提供了对 Radow 对象存储访问的更友好支持. 它允许用户通过 Restful API 的方式进行使用 RadowGW 。

从技术架构上讲, 和 RBD 一样, 它位于 Librados 之上, 而且正如它的名字所暗示的, 它是 Rados 的一个网关,通过 HTTP RestFul API, 对外提供对象存储的服务. 为了用户使用的方便性和兼容性,还提供了兼容 S3 和 Swfit 的接口.

以上,使得可以基于 RadosGW 构建云存储.

环境描述

假设已经有一个安装好的 Ceph 集群. 在一台新的节点上配置 RadowGW

节点信息如下:

主机名字
ceph.test.com
IP 地址
192.168.3.140

NOTE: 所有节点的操作系统都是 centos 7.

安装部署

安装配置 Apache

  1. 安装
$ sudo yum install -y httpd
  1. 配置: 编辑 /etc/httpd/conf/httpd.conf
# 设置 ServerName
ServerName ceph.test.com
 
# 添加下面内容
<IfModule !proxy_fcgi_module>
    LoadModule proxy_fcgi_module modules/mod_proxy_fcgi.so
</IfModule>
 
# 修改监听参数: 192.168.3.140 为本机 IP
Listen 192.168.3.140:80
  1. 重启 Apache
$ sudo systemctl restart httpd

安装配置 RadosGW

  1. 安装 RadosGW
$ sudo yum install -y radosgw-agent
  1. 创建用于 RadosGW 的用户等信息(在 ceph admin 节点执行)
$ sudo ceph-authtool --create-keyring /etc/ceph/ceph.client.radosgw.keyring
$ sudo chmod +r /etc/ceph/ceph.client.radosgw.keyring
$ sudo ceph-authtool /etc/ceph/ceph.client.radosgw.keyring -n client.radosgw.gateway --gen-key
$ sudo ceph-authtool -n client.radosgw.gateway --cap osd 'allow rwx' --cap mon 'allow rwx' /etc/ceph/ceph.client.radosgw.keyring
$ sudo ceph -k /etc/ceph/ceph.client.admin.keyring auth add client.radosgw.gateway -i /etc/ceph/ceph.client.radosgw.keyring

将这个 keyring 和 admin.keyring 复制到 ceph.test.com

$ scp /etc/ceph/ceph.client.radosgw.keyring root@ceph.test.com:/etc/ceph
$ scp /etc/ceph/ceph.client.admin.keyring root@ceph.test.com:/etc/ceph
  1. 创建 RadosGW 需要的 pool(在 ceph admin 节点)
$ ceph osd pool create .rgw 64 64
$ ceph osd pool create .rgw.root 64 64 
$ ceph osd pool create .rgw.control 64 64 
$ ceph osd pool create .rgw.gc 64 64
$ ceph osd pool create .rgw.buckets 64 64 
$ ceph osd pool create .rgw.buckets.index 64 64 
$ ceph osd pool create .log 64 64 
$ ceph osd pool create .intent-log 64 64 
$ ceph osd pool create .usage 64 64 
$ ceph osd pool create .users 64 64 
$ ceph osd pool create .users.email 64 64
$ ceph osd pool create .users.swift 64 64
$ ceph osd pool create .users.uid 64 64
  1. 修改并 gateway.client 的配置到各个节点.

将下列内容添加到 ceph 节点的各个配置文件(包括 RadosGW 节点)

[client.radosgw.gateway]
host = ceph
keyring = /etc/ceph/ceph.client.radosgw.keyring
rgw socket path = /var/run/ceph/ceph.radosgw.gateway.fastcgi.sock
log file = /var/log/radosgw/client.radosgw.gateway.log
rgw print continue = false
  1. 创建 RadosGW 运行目录
$ sudo mkdir -p /var/lib/ceph/radosgw/ceph-radosgw.gateway
$ sudo chown apache:apache /var/log/radosgw/client.radosgw.gateway.log
  1. 启动 RadosGW
$ /etc/init.d/ceph-radosgw start
  1. 将 RadosGW 的配置写入 Apache 中 a) 创建文件 /etc/httpd/conf.d/rgw.conf, 内容为:
<VirtualHost *:80>
ServerName localhost
DocumentRoot /var/www/html
 
ErrorLog /var/log/httpd/rgw_error.log
CustomLog /var/log/httpd/rgw_access.log combined
 
# LogLevel debug
RewriteEngine On
RewriteRule .* - [E=HTTP_AUTHORIZATION:%{HTTP:Authorization},L]
SetEnv proxy-nokeepalive 1
ProxyPass / unix:///var/run/ceph/ceph.radosgw.gateway.fastcgi.sock|fcgi://localhost:9000/
</VirtualHost>

b) 重启 Apache

$ sudo systemctl restart httpd

测试

创建用户

为了完成测试,分别创建一个用于 S3 和 Swift 的用户:

  1. 创建 S3 用户:
$ sudo radosgw-admin user create --uid="testuser" --display-name="First User"
{ "user_id": "testuser",
  ... ...
  "keys": [
        { "user": "testuser",
          "access_key": "EZU2MX4CJCIZILATWQSK",
          "secret_key": "4z18K+f7MZQop2Z99PxN2KKGSX9rb6KBd9ioW0D\/"},
  ... ...
  "temp_url_keys": []}
  1. 创建 Swift 用户: a) 创建 Swift 用户
$ sudo radosgw-admin subuser create --uid=testuser --subuser=testuser:swift
{ "user_id": "testuser",
  ... ...
  "keys": [
        { "user": "testuser",
          "access_key": "EZU2MX4CJCIZILATWQSK",
          "secret_key": "4z18K+f7MZQop2Z99PxN2KKGSX9rb6KBd9ioW0D\/"},
        { "user": "testuser:swift",
          "access_key": "SI7W1FUI43H3I3ED9CWX",
          "secret_key": ""}],
  ... ...
  "temp_url_keys": []}

b) 设置 Swift 用户的密钥:

$ sudo radosgw-admin subuser create --uid=testuser --subuser=testuser:swift
{ "user_id": "testuser",
  ... ...
  "keys": [
        { "user": "testuser",
          "access_key": "EZU2MX4CJCIZILATWQSK",
          "secret_key": "4z18K+f7MZQop2Z99PxN2KKGSX9rb6KBd9ioW0D\/"},
        { "user": "testuser:swift",
          "access_key": "SI7W1FUI43H3I3ED9CWX",
          "secret_key": ""}],
  "swift_keys": [
        { "user": "testuser:swift",
          "secret_key": "yTXiN+2y1Uf6j+CXioZwhqzwCPhOgqVblm2iShj+"}],
  ... ...
  "temp_url_keys": []}

测试

下面的例子使用 s3cmd 测试用 S3 接口连接使用 RadosGW:

NOTE: s3cmd的安装配置请参考 s3cmd 的相关文档

$ s3cmd mb s3://my-bucket # 创建一个 bucket
Bucket 's3://my-bucket/' created
$ s3cmd ls
2015-08-03 10:11  s3://my-bucket
$ cat test.sh 
source openrc
python foo.py $@
$ s3cmd put test.sh s3://my-bucket/my-dir/test.sh # 将 test.sh 上传到 my-bucket/my-dir 下
test.sh -> s3://my-bucket/my-dir/test.sh  [1 of 1]
 31 of 31   100% in    0s   196.90 B/s  done
$ s3cmd ls s3://my-bucket # 查看目录
                       DIR   s3://my-bucket/my-dir/
$ s3cmd get s3://my-bucket/my-dir/test.sh test1.sh # 下载 test.sh 这个对象
s3://my-bucket/my-dir/test.sh -> test1.sh  [1 of 1]
s3://my-bucket/my-dir/test.sh -> test1.sh  [1 of 1]
 31 of 31   100% in    0s     5.83 kB/s  done
$ cat test1.sh
source openrc
python foo.py $@

分类: Storage 标签: ,