PHP-rdkafka 内核扩展相关源码分析

@李彪  March 6, 2020

这篇文章主要针对PHP生态的的kafka组件 php-rdkafka 进行相关的内核源码分析,方便大家把握组件的相关使用,目前文章主要针对kafka生产者部分。

一. 样例PHP代码

public function __construct($config)
    {
        $conf = new \RdKafka\Conf();
        $conf->set('metadata.broker.list', $config['brokerList']);
        $conf->set('message.max.bytes', $config['messageMaxBytes']);
        $conf->set('metadata.request.timeout.ms', $config['requestTimeout']);
        $conf->set('session.timeout.ms', $config['sessionTimeout']);
        $this->producer = new \RdKafka\Producer($conf);
        $this->producer->addBrokers($config['brokerList']);
    }

    public function sendMessage($data){
        $result = 1;
        $topic = $this->producer->newTopic($data[0]['topic']);
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, $data[0]['value']);
        $this->producer->poll(0);
        for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
            $result = $this->producer->flush(10000);
            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
                break;
            }
        }
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Was unable to flush, messages might be lost!');
        }
    }

二. RdKafkaConf类内核实现

2.1 Conf钩子函数

Conf类注册在Conf.c源码文件kafka_conf_minit函数

    INIT_NS_CLASS_ENTRY(tmpce, "RdKafka", "Conf", kafka_conf_fe);
    ce_kafka_conf = zend_register_internal_class(&tmpce TSRMLS_CC);
    ce_kafka_conf->create_object = kafka_conf_new; //注册对象创建钩子函数

    INIT_NS_CLASS_ENTRY(tmpce, "RdKafka", "TopicConf", kafka_topic_conf_fe);
    ce_kafka_topic_conf = zend_register_internal_class(&tmpce TSRMLS_CC);
    ce_kafka_topic_conf->create_object = kafka_conf_new;

2.2 Conf对象内核创建阶段

PHP内核接入层暴露如下:

PHP_METHOD(RdKafka__Conf, __construct)
{
    kafka_conf_object *intern;
    zend_error_handling error_handling;

    zend_replace_error_handling(EH_THROW, spl_ce_InvalidArgumentException, &error_handling TSRMLS_CC);

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "") == FAILURE) {
        zend_restore_error_handling(&error_handling TSRMLS_CC);
        return;
    }

    intern = get_custom_object_zval(kafka_conf_object, getThis());
    intern->type = KAFKA_CONF;
    intern->u.conf = rd_kafka_conf_new(); //关键函数:rd_kafka_conf_new

    zend_restore_error_handling(&error_handling TSRMLS_CC);
}
/* }}} */

RdKafka Conf的创建对象执行函数为 kafka_conf_new ,函数中关键是kafka_conf_object数据结构

typedef struct _kafka_conf_object {
#if PHP_MAJOR_VERSION < 7
    zend_object                 std;
#endif
    kafka_conf_type type;
    union {
        rd_kafka_conf_t         *conf;  //此处关键,这个数据结构定义属于 librdkafka库
        rd_kafka_topic_conf_t   *topic_conf; //此处关键,这个数据结构定义属于 librdkafka库
    } u;
    kafka_conf_callbacks cbs;
#if PHP_MAJOR_VERSION >= 7
    zend_object                 std;
#endif
} kafka_conf_object;
static zend_object_value kafka_conf_new(zend_class_entry *class_type TSRMLS_DC) /* {{{ */
{
    zend_object_value retval;
    kafka_conf_object *intern;

    intern = alloc_object(intern, class_type); //kafka conf对象内存分配
    zend_object_std_init(&intern->std, class_type TSRMLS_CC);
    object_properties_init(&intern->std, class_type);

    STORE_OBJECT(retval, intern, (zend_objects_store_dtor_t) zend_objects_destroy_object, kafka_conf_free, NULL); //把创建好的object返回php调用层。
    SET_OBJECT_HANDLERS(retval, &handlers);

    return retval;
}
/* }}}

2.3 Conf对象Set函数

PHP_METHOD(RdKafka__Conf, set)
{
    char *name;
    arglen_t name_len;
    char *value;
    arglen_t value_len;
    kafka_conf_object *intern;
    rd_kafka_conf_res_t ret = 0;
    char errstr[512];

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss", &name, &name_len, &value, &value_len) == FAILURE) {
        return;
    }

    intern = get_kafka_conf_object(getThis() TSRMLS_CC); //获取kafka_conf_new创建的kafka_conf_object对象
    if (!intern) {
        return;
    }

    errstr[0] = '\0';

    switch (intern->type) { //获取librdkafka 的kafka config类型
        case KAFKA_CONF: //进行librdkafka KAFKA_CONF参数设置
            ret = rd_kafka_conf_set(intern->u.conf, name, value, errstr, sizeof(errstr));
            break;
        case KAFKA_TOPIC_CONF: //进行librdkafka KAFKA_TOPIC_CONF参数设置
            ret = rd_kafka_topic_conf_set(intern->u.topic_conf, name, value, errstr, sizeof(errstr));
            break;
    }

    switch (ret) { //此处的ret返回值为ibrdkafka库的执行状态返回,如果一场则进行异常上报
        case RD_KAFKA_CONF_UNKNOWN: // -2
            zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_CONF_UNKNOWN TSRMLS_CC);
            return;
        case RD_KAFKA_CONF_INVALID: // -1
            zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_CONF_INVALID TSRMLS_CC);
            return;
        case RD_KAFKA_CONF_OK: // 0
            break;
    }
}
/* }}} */

这个set阶段,就是调用librdkafka的提供的set函数族相关函数进行参数传递。

三. RdKafkaProducer类 内核实现

3.1 Producer 类内核注册

INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Producer", kafka_producer_fe);
ce_kafka_producer = rdkafka_register_internal_class_ex(&ce, ce_kafka TSRMLS_CC);
//扩展模块初始化阶段、注册类内核结构
static const zend_function_entry kafka_producer_fe[] = {
    PHP_ME(RdKafka__Producer, __construct, arginfo_kafka_producer___construct, ZEND_ACC_PUBLIC)
    PHP_FE_END
};

接下来,看一下RdKafkaProducer 的构造函数阶段,这个阶段关键是kafka_init函数

PHP_METHOD(RdKafka__Producer, __construct)
{
    zval *zconf = NULL;
    zend_error_handling error_handling;

    zend_replace_error_handling(EH_THROW, spl_ce_InvalidArgumentException, &error_handling TSRMLS_CC);

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|O!", &zconf, ce_kafka_conf) == FAILURE) {
        zend_restore_error_handling(&error_handling TSRMLS_CC);
        return;
    }

    kafka_init(getThis(), RD_KAFKA_PRODUCER, zconf TSRMLS_CC); //新建kafka库内存对象,这个对象有个选项,这个创建的RD_KAFKA_PRODUCER对象,即生产者对象。

    zend_restore_error_handling(&error_handling TSRMLS_CC);
}
/* }}} */

四. 重要公共内核函数分析

4.1 kafka_init 内核函数(生产者、消费者)

这个是创建kafka生产者、消费者的内核函数,具体函数实现如下:

static void kafka_init(zval *this_ptr, rd_kafka_type_t type, zval *zconf TSRMLS_DC) /* {{{ */
{
    char errstr[512];
    rd_kafka_t *rk;
    kafka_object *intern; //这是关键的kafka内核对象
    kafka_conf_object *conf_intern;
    rd_kafka_conf_t *conf = NULL;

    intern = get_custom_object_zval(kafka_object, this_ptr); //获取当前对象注入或者绑定的conf属性
    intern->type = type;

    if (zconf) { //如果php内核有conf参数则使用获取conf的内部结构conf_intern
        conf_intern = get_kafka_conf_object(zconf TSRMLS_CC);
        if (conf_intern) { 
          //如果conf_intern结构存在
          //调用rd_kafka_conf_dup这个librdkafka库函数进行创建配置对象conf的副本
            conf = rd_kafka_conf_dup(conf_intern->u.conf);
          //这个执行相关回调拷贝过程,大多数是拷贝函数执行过程的一些信息。
            kafka_conf_callbacks_copy(&intern->cbs, &conf_intern->cbs TSRMLS_CC);
            intern->cbs.zrk = *this_ptr;
          //这部分也是librdkafka库内核函数,设置应用程序的不透明指针,该指针将传递给回调
            rd_kafka_conf_set_opaque(conf, &intern->cbs);
        }
    }

    //关键之处:
    //此处根据type进行kafka相关对象元素的创建,主要是生产者、消费者对象
    rk = rd_kafka_new(type, conf, errstr, sizeof(errstr));

    if (rk == NULL) { //如果创建失败则上报异常
        zend_throw_exception(ce_kafka_exception, errstr, 0 TSRMLS_CC);
        return;
    }

    if (intern->cbs.log) {
        rd_kafka_set_log_queue(rk, NULL);
    }

    intern->rk = rk; //这是关键,把librdkafka库的内存结构指针存储在php扩展层相关内存结构。

   //如果是消费者对象 则针对php内核层的intern的consuming 和queues哈希表空间,并设置相关的内存回收钩子函数
    if (type == RD_KAFKA_CONSUMER) {
        zend_hash_init(&intern->consuming, 0, NULL, (dtor_func_t)toppar_pp_dtor, 0);
        zend_hash_init(&intern->queues, 0, NULL, (dtor_func_t)kafka_queue_object_pre_free, 0);
    }

   //生产者、消费者均需要初始化intern对象的topics哈希表空间。
    zend_hash_init(&intern->topics, 0, NULL, (dtor_func_t)kafka_topic_object_pre_free, 0);
}
/* }}} */

4.2 addBrokers函数(生产者、消费者)

PHP_METHOD(RdKafka__Kafka, addBrokers)
{
    char *broker_list;
    arglen_t broker_list_len;
    kafka_object *intern;

    //把broker_list php语言层的字符串数组转换为 当前的broker_list字符串空间。 
    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &broker_list, &broker_list_len) == FAILURE) {
        return;
    }

    intern = get_kafka_object(getThis() TSRMLS_CC); //获取当前intern对象 当前可能是消费者、或者生产者
    if (!intern) {
        return;
    }

   //利用librdkafka库进行集群节点的注入操作
    RETURN_LONG(rd_kafka_brokers_add(intern->rk, broker_list));
}
/* }}} */

4.3 newTopic函数(生产者、消费者)

PHP_METHOD(RdKafka__Kafka, newTopic)
{
    char *topic;
    arglen_t topic_len;
    rd_kafka_topic_t *rkt;
    kafka_object *intern;
    kafka_topic_object *topic_intern;
    zend_class_entry *topic_type;
    zval *zconf = NULL;
    rd_kafka_topic_conf_t *conf = NULL;
    kafka_conf_object *conf_intern;
   
    //解析PHP层参数,参数解析进入topic参数
    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|O!", &topic, &topic_len, &zconf, ce_kafka_topic_conf) == FAILURE) {
        return;
    }

   //获取当下的kafka对象
    intern = get_kafka_object(getThis() TSRMLS_CC); 
    if (!intern) {
        return;
    }

   //获取kafka配置对象
    if (zconf) {
        conf_intern = get_kafka_conf_object(zconf TSRMLS_CC);
        if (conf_intern) {
            conf = rd_kafka_topic_conf_dup(conf_intern->u.topic_conf);
        }
    }

   //调用librdkafka库rd_kafka_topic_new函数创建新的kafka topic内存对象
    rkt = rd_kafka_topic_new(intern->rk, topic, conf);

    if (!rkt) {
        return;
    }

   //根据消费者或者生产者的类别进行topic类型设置
    switch (intern->type) {
        case RD_KAFKA_CONSUMER:
            topic_type = ce_kafka_consumer_topic;
            break;
        case RD_KAFKA_PRODUCER:
            topic_type = ce_kafka_producer_topic;
            break;
        default:
            return;
    }

    //创建PHP层topic对象作为返回值
    if (object_init_ex(return_value, topic_type) != SUCCESS) {
        return;
    }

    topic_intern = get_custom_object_zval(kafka_topic_object, return_value);
    if (!topic_intern) {
        return;
    }

    topic_intern->rkt = rkt;
#if PHP_MAJOR_VERSION >= 7
    topic_intern->zrk = *getThis();
#else
    topic_intern->zrk = getThis();
#endif
    Z_ADDREF_P(P_ZEVAL(topic_intern->zrk));

    zend_hash_index_add_ptr(&intern->topics, (zend_ulong)topic_intern, topic_intern);
}
/* }}} */

4.4 poll函数

poll提供了kafka的事件句柄,事件将导致应用程序提供的回调被调用。timeout_ms参数指定最长时间,以毫秒为单位),该调用将阻止等待事件,对于非阻塞调用,设置参数为0,如果需要无限等待则设置参数为-1。

PHP_METHOD(RdKafka__Kafka, poll)
{
    kafka_object *intern;
    zend_long timeout;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &timeout) == FAILURE) {
        return;
    }

    //获取php内核当前环境的kafka对象
    intern = get_kafka_object(getThis() TSRMLS_CC);
    if (!intern) {
        return;
    }

    //调用librdkafka库rd_kafka_poll函数等待kafka事件。
    RETURN_LONG(rd_kafka_poll(intern->rk, timeout));
}

4.5 flush函数

等待所有的生产请求结束,通常应在销毁生产者实例之前完成,确保所有排队的生产请求均已完成,该函数将调用rd_kafka_poll()并由此触发回调。

如果已启用RD_KAFKA_EVENT_DR(通过rd_kafka_conf_set_events函数),该函数不会调用rd_kafka_poll(),但要等待librdkafka处理消息计数达到零。这需要应用程序在单独的线程中提供事件队列。在此模式下,仅统计messages,不计算其他类型的排队的事件。

返回值:RD_KAFKA_RESP_ERR__TIMED_OUT(如果在所有时间之前都达到了timeout_ms),未完成的请求已完成,否则RD_KAFKA_RESP_ERR_NO_ERROR

/* {{{ proto int RdKafka\Kafka::flush(int $timeout_ms)
   Wait until all outstanding produce requests, et.al, are completed. */
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_flush, 0, 0, 1)
    ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Kafka, flush)
{
    kafka_object *intern;
    zend_long timeout;

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &timeout) == FAILURE) {
        return;
    }
    //获取当前kafka内存对象
    intern = get_kafka_object(getThis() TSRMLS_CC);
    if (!intern) {
        return;
    }

    //针对kafka内存对象的rk,这个对象可以是消费者、也可以是生产者。
    //调用librdkafka库的rd_kafka_flush函数进行刷新操作,内部会控制调用rd_kafka_poll函数。
    RETURN_LONG(rd_kafka_flush(intern->rk, timeout));
}
/* }}} */

评论已关闭