这篇文章主要针对PHP生态的的kafka组件 php-rdkafka 进行相关的内核源码分析,方便大家把握组件的相关使用,目前文章主要针对kafka生产者部分。
一. 样例PHP代码
public function __construct($config)
{
$conf = new \RdKafka\Conf();
$conf->set('metadata.broker.list', $config['brokerList']);
$conf->set('message.max.bytes', $config['messageMaxBytes']);
$conf->set('metadata.request.timeout.ms', $config['requestTimeout']);
$conf->set('session.timeout.ms', $config['sessionTimeout']);
$this->producer = new \RdKafka\Producer($conf);
$this->producer->addBrokers($config['brokerList']);
}
public function sendMessage($data){
$result = 1;
$topic = $this->producer->newTopic($data[0]['topic']);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $data[0]['value']);
$this->producer->poll(0);
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $this->producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
}
二. RdKafkaConf类内核实现
2.1 Conf钩子函数
Conf类注册在Conf.c源码文件kafka_conf_minit函数
INIT_NS_CLASS_ENTRY(tmpce, "RdKafka", "Conf", kafka_conf_fe);
ce_kafka_conf = zend_register_internal_class(&tmpce TSRMLS_CC);
ce_kafka_conf->create_object = kafka_conf_new; //注册对象创建钩子函数
INIT_NS_CLASS_ENTRY(tmpce, "RdKafka", "TopicConf", kafka_topic_conf_fe);
ce_kafka_topic_conf = zend_register_internal_class(&tmpce TSRMLS_CC);
ce_kafka_topic_conf->create_object = kafka_conf_new;
2.2 Conf对象内核创建阶段
PHP内核接入层暴露如下:
PHP_METHOD(RdKafka__Conf, __construct)
{
kafka_conf_object *intern;
zend_error_handling error_handling;
zend_replace_error_handling(EH_THROW, spl_ce_InvalidArgumentException, &error_handling TSRMLS_CC);
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "") == FAILURE) {
zend_restore_error_handling(&error_handling TSRMLS_CC);
return;
}
intern = get_custom_object_zval(kafka_conf_object, getThis());
intern->type = KAFKA_CONF;
intern->u.conf = rd_kafka_conf_new(); //关键函数:rd_kafka_conf_new
zend_restore_error_handling(&error_handling TSRMLS_CC);
}
/* }}} */
RdKafka Conf的创建对象执行函数为 kafka_conf_new ,函数中关键是kafka_conf_object数据结构
typedef struct _kafka_conf_object {
#if PHP_MAJOR_VERSION < 7
zend_object std;
#endif
kafka_conf_type type;
union {
rd_kafka_conf_t *conf; //此处关键,这个数据结构定义属于 librdkafka库
rd_kafka_topic_conf_t *topic_conf; //此处关键,这个数据结构定义属于 librdkafka库
} u;
kafka_conf_callbacks cbs;
#if PHP_MAJOR_VERSION >= 7
zend_object std;
#endif
} kafka_conf_object;
static zend_object_value kafka_conf_new(zend_class_entry *class_type TSRMLS_DC) /* {{{ */
{
zend_object_value retval;
kafka_conf_object *intern;
intern = alloc_object(intern, class_type); //kafka conf对象内存分配
zend_object_std_init(&intern->std, class_type TSRMLS_CC);
object_properties_init(&intern->std, class_type);
STORE_OBJECT(retval, intern, (zend_objects_store_dtor_t) zend_objects_destroy_object, kafka_conf_free, NULL); //把创建好的object返回php调用层。
SET_OBJECT_HANDLERS(retval, &handlers);
return retval;
}
/* }}}
2.3 Conf对象Set函数
PHP_METHOD(RdKafka__Conf, set)
{
char *name;
arglen_t name_len;
char *value;
arglen_t value_len;
kafka_conf_object *intern;
rd_kafka_conf_res_t ret = 0;
char errstr[512];
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss", &name, &name_len, &value, &value_len) == FAILURE) {
return;
}
intern = get_kafka_conf_object(getThis() TSRMLS_CC); //获取kafka_conf_new创建的kafka_conf_object对象
if (!intern) {
return;
}
errstr[0] = '\0';
switch (intern->type) { //获取librdkafka 的kafka config类型
case KAFKA_CONF: //进行librdkafka KAFKA_CONF参数设置
ret = rd_kafka_conf_set(intern->u.conf, name, value, errstr, sizeof(errstr));
break;
case KAFKA_TOPIC_CONF: //进行librdkafka KAFKA_TOPIC_CONF参数设置
ret = rd_kafka_topic_conf_set(intern->u.topic_conf, name, value, errstr, sizeof(errstr));
break;
}
switch (ret) { //此处的ret返回值为ibrdkafka库的执行状态返回,如果一场则进行异常上报
case RD_KAFKA_CONF_UNKNOWN: // -2
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_CONF_UNKNOWN TSRMLS_CC);
return;
case RD_KAFKA_CONF_INVALID: // -1
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_CONF_INVALID TSRMLS_CC);
return;
case RD_KAFKA_CONF_OK: // 0
break;
}
}
/* }}} */
这个set阶段,就是调用librdkafka的提供的set函数族相关函数进行参数传递。
三. RdKafkaProducer类 内核实现
3.1 Producer 类内核注册
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Producer", kafka_producer_fe);
ce_kafka_producer = rdkafka_register_internal_class_ex(&ce, ce_kafka TSRMLS_CC);
//扩展模块初始化阶段、注册类内核结构
static const zend_function_entry kafka_producer_fe[] = {
PHP_ME(RdKafka__Producer, __construct, arginfo_kafka_producer___construct, ZEND_ACC_PUBLIC)
PHP_FE_END
};
接下来,看一下RdKafkaProducer 的构造函数阶段,这个阶段关键是kafka_init函数
PHP_METHOD(RdKafka__Producer, __construct)
{
zval *zconf = NULL;
zend_error_handling error_handling;
zend_replace_error_handling(EH_THROW, spl_ce_InvalidArgumentException, &error_handling TSRMLS_CC);
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|O!", &zconf, ce_kafka_conf) == FAILURE) {
zend_restore_error_handling(&error_handling TSRMLS_CC);
return;
}
kafka_init(getThis(), RD_KAFKA_PRODUCER, zconf TSRMLS_CC); //新建kafka库内存对象,这个对象有个选项,这个创建的RD_KAFKA_PRODUCER对象,即生产者对象。
zend_restore_error_handling(&error_handling TSRMLS_CC);
}
/* }}} */
四. 重要公共内核函数分析
4.1 kafka_init 内核函数(生产者、消费者)
这个是创建kafka生产者、消费者的内核函数,具体函数实现如下:
static void kafka_init(zval *this_ptr, rd_kafka_type_t type, zval *zconf TSRMLS_DC) /* {{{ */
{
char errstr[512];
rd_kafka_t *rk;
kafka_object *intern; //这是关键的kafka内核对象
kafka_conf_object *conf_intern;
rd_kafka_conf_t *conf = NULL;
intern = get_custom_object_zval(kafka_object, this_ptr); //获取当前对象注入或者绑定的conf属性
intern->type = type;
if (zconf) { //如果php内核有conf参数则使用获取conf的内部结构conf_intern
conf_intern = get_kafka_conf_object(zconf TSRMLS_CC);
if (conf_intern) {
//如果conf_intern结构存在
//调用rd_kafka_conf_dup这个librdkafka库函数进行创建配置对象conf的副本
conf = rd_kafka_conf_dup(conf_intern->u.conf);
//这个执行相关回调拷贝过程,大多数是拷贝函数执行过程的一些信息。
kafka_conf_callbacks_copy(&intern->cbs, &conf_intern->cbs TSRMLS_CC);
intern->cbs.zrk = *this_ptr;
//这部分也是librdkafka库内核函数,设置应用程序的不透明指针,该指针将传递给回调
rd_kafka_conf_set_opaque(conf, &intern->cbs);
}
}
//关键之处:
//此处根据type进行kafka相关对象元素的创建,主要是生产者、消费者对象
rk = rd_kafka_new(type, conf, errstr, sizeof(errstr));
if (rk == NULL) { //如果创建失败则上报异常
zend_throw_exception(ce_kafka_exception, errstr, 0 TSRMLS_CC);
return;
}
if (intern->cbs.log) {
rd_kafka_set_log_queue(rk, NULL);
}
intern->rk = rk; //这是关键,把librdkafka库的内存结构指针存储在php扩展层相关内存结构。
//如果是消费者对象 则针对php内核层的intern的consuming 和queues哈希表空间,并设置相关的内存回收钩子函数
if (type == RD_KAFKA_CONSUMER) {
zend_hash_init(&intern->consuming, 0, NULL, (dtor_func_t)toppar_pp_dtor, 0);
zend_hash_init(&intern->queues, 0, NULL, (dtor_func_t)kafka_queue_object_pre_free, 0);
}
//生产者、消费者均需要初始化intern对象的topics哈希表空间。
zend_hash_init(&intern->topics, 0, NULL, (dtor_func_t)kafka_topic_object_pre_free, 0);
}
/* }}} */
4.2 addBrokers函数(生产者、消费者)
PHP_METHOD(RdKafka__Kafka, addBrokers)
{
char *broker_list;
arglen_t broker_list_len;
kafka_object *intern;
//把broker_list php语言层的字符串数组转换为 当前的broker_list字符串空间。
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &broker_list, &broker_list_len) == FAILURE) {
return;
}
intern = get_kafka_object(getThis() TSRMLS_CC); //获取当前intern对象 当前可能是消费者、或者生产者
if (!intern) {
return;
}
//利用librdkafka库进行集群节点的注入操作
RETURN_LONG(rd_kafka_brokers_add(intern->rk, broker_list));
}
/* }}} */
4.3 newTopic函数(生产者、消费者)
PHP_METHOD(RdKafka__Kafka, newTopic)
{
char *topic;
arglen_t topic_len;
rd_kafka_topic_t *rkt;
kafka_object *intern;
kafka_topic_object *topic_intern;
zend_class_entry *topic_type;
zval *zconf = NULL;
rd_kafka_topic_conf_t *conf = NULL;
kafka_conf_object *conf_intern;
//解析PHP层参数,参数解析进入topic参数
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|O!", &topic, &topic_len, &zconf, ce_kafka_topic_conf) == FAILURE) {
return;
}
//获取当下的kafka对象
intern = get_kafka_object(getThis() TSRMLS_CC);
if (!intern) {
return;
}
//获取kafka配置对象
if (zconf) {
conf_intern = get_kafka_conf_object(zconf TSRMLS_CC);
if (conf_intern) {
conf = rd_kafka_topic_conf_dup(conf_intern->u.topic_conf);
}
}
//调用librdkafka库rd_kafka_topic_new函数创建新的kafka topic内存对象
rkt = rd_kafka_topic_new(intern->rk, topic, conf);
if (!rkt) {
return;
}
//根据消费者或者生产者的类别进行topic类型设置
switch (intern->type) {
case RD_KAFKA_CONSUMER:
topic_type = ce_kafka_consumer_topic;
break;
case RD_KAFKA_PRODUCER:
topic_type = ce_kafka_producer_topic;
break;
default:
return;
}
//创建PHP层topic对象作为返回值
if (object_init_ex(return_value, topic_type) != SUCCESS) {
return;
}
topic_intern = get_custom_object_zval(kafka_topic_object, return_value);
if (!topic_intern) {
return;
}
topic_intern->rkt = rkt;
#if PHP_MAJOR_VERSION >= 7
topic_intern->zrk = *getThis();
#else
topic_intern->zrk = getThis();
#endif
Z_ADDREF_P(P_ZEVAL(topic_intern->zrk));
zend_hash_index_add_ptr(&intern->topics, (zend_ulong)topic_intern, topic_intern);
}
/* }}} */
4.4 poll函数
poll提供了kafka的事件句柄,事件将导致应用程序提供的回调被调用。timeout_ms参数指定最长时间,以毫秒为单位),该调用将阻止等待事件,对于非阻塞调用,设置参数为0,如果需要无限等待则设置参数为-1。
PHP_METHOD(RdKafka__Kafka, poll)
{
kafka_object *intern;
zend_long timeout;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &timeout) == FAILURE) {
return;
}
//获取php内核当前环境的kafka对象
intern = get_kafka_object(getThis() TSRMLS_CC);
if (!intern) {
return;
}
//调用librdkafka库rd_kafka_poll函数等待kafka事件。
RETURN_LONG(rd_kafka_poll(intern->rk, timeout));
}
4.5 flush函数
等待所有的生产请求结束,通常应在销毁生产者实例之前完成,确保所有排队的生产请求均已完成,该函数将调用rd_kafka_poll()并由此触发回调。
如果已启用RD_KAFKA_EVENT_DR(通过rd_kafka_conf_set_events函数),该函数不会调用rd_kafka_poll(),但要等待librdkafka处理消息计数达到零。这需要应用程序在单独的线程中提供事件队列。在此模式下,仅统计messages,不计算其他类型的排队的事件。
返回值:RD_KAFKA_RESP_ERR__TIMED_OUT(如果在所有时间之前都达到了timeout_ms),未完成的请求已完成,否则RD_KAFKA_RESP_ERR_NO_ERROR
/* {{{ proto int RdKafka\Kafka::flush(int $timeout_ms)
Wait until all outstanding produce requests, et.al, are completed. */
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_flush, 0, 0, 1)
ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()
PHP_METHOD(RdKafka__Kafka, flush)
{
kafka_object *intern;
zend_long timeout;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &timeout) == FAILURE) {
return;
}
//获取当前kafka内存对象
intern = get_kafka_object(getThis() TSRMLS_CC);
if (!intern) {
return;
}
//针对kafka内存对象的rk,这个对象可以是消费者、也可以是生产者。
//调用librdkafka库的rd_kafka_flush函数进行刷新操作,内部会控制调用rd_kafka_poll函数。
RETURN_LONG(rd_kafka_flush(intern->rk, timeout));
}
/* }}} */
评论已关闭