异步任务的封装
//yar_transport.h
typedef struct _yar_call_data {
ulong sequence;
zend_string *uri;
zend_string *method;
zval callback;
zval ecallback;
zval parameters;
zval options;
} yar_call_data_t;
Yar用yar_call_data_t
表示一个异步任务,sequence是从1开始的任务ID,除了sequence,其他基本上就是对应Yar_Concurrent_Client::call()
上的同名参数。为方便作为一个PHP变量使用该结构体,该结构体平时被Yar包装成一个le_calldata
型Resource。
异步客户端
//从c源码反推出的的PHP类定义,没有PHP定义文件
class Yar_Concurrent_Client{
/** @var Resource[] le_calldata资源数组 */
protected static $_callstack;
/** @var Callable RPC成功的回调 */
protected static $_callback;
/** @var Callable RPC失败的回调 */
protected static $_error_callback;
/** @var boolean $_start true 异步调用执行中 false 未执行 */
protected static $_start;
public static call($uri , $method, $parameters=null, $callback=null,$error_callback=null,$options):int
public static loop($callback=null , $error_callback=null):bool
}
Yar_Concurrent_Client定义如上,
看下Yar_Concurrent_Client::call()
用于注册一个异步RPC调用
//yar_client.c
/* {{{ proto Yar_Concurrent_Client::call($uri, $method, $parameters = NULL, $callback = NULL, $error_callback = NULL, $options = array()) */
PHP_METHOD(yar_concurrent_client, call) {
zend_string *uri, *method;
zend_string *name = NULL;
long sequence;
zval *callstack, item, *status;
zval *error_callback = NULL, *callback = NULL, *parameters = NULL, *options = NULL;
yar_call_data_t *entry;
if (zend_parse_parameters(ZEND_NUM_ARGS(), "SS|a!z!za",
&uri, &method, ¶meters, &callback, &error_callback, &options) == FAILURE) {
return;
}
if (!ZSTR_LEN(uri)) {
php_error_docref(NULL, E_WARNING, "first parameter is expected to be a valid rpc server uri");
return;
}
if (strncasecmp(ZSTR_VAL(uri), "http://", sizeof("http://") - 1)
&& strncasecmp(ZSTR_VAL(uri), "https://", sizeof("https://") - 1)) {
php_error_docref(NULL, E_WARNING, "only http protocol is supported in concurrent client for now");
return;
}
if (!method->len) {
php_error_docref(NULL, E_WARNING, "second parameter is expected to be a valid rpc api name");
return;
}
//回调有效性检查
if (callback && !Z_ISNULL_P(callback) &&
!zend_is_callable(callback, 0, &name)) {
php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "fourth parameter is expected to be a valid callback");
zend_string_release(name);
RETURN_FALSE;
}
if (name) {
zend_string_release(name);
name = NULL;
}
if (error_callback && !Z_ISNULL_P(error_callback) &&
!zend_is_callable(error_callback, 0, &name)) {
php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "fifth parameter is expected to be a valid error callback");
zend_string_release(name);
RETURN_FALSE;
}
if (name) {
zend_string_release(name);
}
//执行中的concurrent_client不能添加任务
status = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_start"), 0);
if (Z_TYPE_P(status) == IS_TRUE) {
php_error_docref(NULL, E_WARNING, "concurrent client has already started");
RETURN_FALSE;
}
entry = ecalloc(1, sizeof(yar_call_data_t));
entry->uri = zend_string_copy(uri);
entry->method = zend_string_copy(method);
if (callback && !Z_ISNULL_P(callback)) {
ZVAL_COPY(&entry->callback, callback);
}
if (error_callback && !Z_ISNULL_P(error_callback)) {
ZVAL_COPY(&entry->ecallback, error_callback);
}
if (parameters && IS_ARRAY == Z_TYPE_P(parameters)) {
ZVAL_COPY(&entry->parameters, parameters);
}
if (options && IS_ARRAY == Z_TYPE_P(options)) {
ZVAL_COPY(&entry->options, options);
}
callstack = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), 0);
//初始化Yar_Concurrent_Client::_callstack;
if (Z_ISNULL_P(callstack)) {
zval tmp;
array_init(&tmp);
zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), &tmp);
zval_ptr_dtor(&tmp);
}
//用yar_call_data_t生成一个le_calldata资源,写入_callstack属性
ZVAL_RES(&item, zend_register_resource(entry, le_calldata));
sequence = zend_hash_next_free_element(Z_ARRVAL_P(callstack));
entry->sequence = sequence + 1;
zend_hash_next_index_insert(Z_ARRVAL_P(callstack), &item);
RETURN_LONG(entry->sequence);
}
/* }}} */
整个call调用,除去防御代码,其实就做了一件事,构造一个 le_calldata型
Resource,并添加到Yar_Concurrent_Client::_callstack
数组中。
接着是Yar_Concurrent_Client::loop()
//yar_client.c
PHP_METHOD(yar_concurrent_client, loop) {
zend_string *name = NULL;
zval *callstack;
zval *callback = NULL, *error_callback = NULL;
zval *status;
uint ret = 0;
if (zend_parse_parameters(ZEND_NUM_ARGS(), "|zz", &callback, &error_callback) == FAILURE) {
return;
}
status = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_start"), 0);
if (Z_TYPE_P(status) == IS_TRUE) {
php_error_docref(NULL, E_WARNING, "concurrent client has already started");
RETURN_FALSE;
}
if (callback && !Z_ISNULL_P(callback) &&
!zend_is_callable(callback, 0, &name)) {
php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "first argument is expected to be a valid callback");
zend_string_release(name);
RETURN_FALSE;
}
if (name) {
zend_string_release(name);
name = NULL;
}
if (error_callback && !Z_ISNULL_P(error_callback) &&
!zend_is_callable(error_callback, 0, &name)) {
php_error_docref1(NULL, ZSTR_VAL(name), E_ERROR, "second argument is expected to be a valid error callback");
zend_string_release(name);
RETURN_FALSE;
}
if (name) {
zend_string_release(name);
}
callstack = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), 0);
if (Z_ISNULL_P(callstack) || zend_hash_num_elements(Z_ARRVAL_P(callstack)) == 0) {
RETURN_TRUE;
}
//回调函数写入$_callback,$_error_callback 类成员变量
if (callback && !Z_ISNULL_P(callback)) {
zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), callback);
}
if (error_callback && !Z_ISNULL_P(error_callback)) {
zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_error_callback"), error_callback);
}
//更新$_start类成员变量并执行 php_yar_concurrent_client_hand()
ZVAL_BOOL(status, 1);
ret = php_yar_concurrent_client_handle(callstack);
ZVAL_BOOL(status, 0);
RETURN_BOOL(ret);
}
可见除了设置回调函数,唯一实际有效的操作是调用php_yar_concurrent_client_hand()
//yar_client.c
int php_yar_concurrent_client_handle(zval *callstack) /* {{{ */ {
char *msg;
zval *calldata;
zend_ulong sequence;
yar_request_t *request;
const yar_transport_t *factory;
yar_transport_interface_t *transport;
yar_transport_multi_interface_t *multi;
//socket(Tcp/unixsock协议)传输器目前不支持并发调用
factory = php_yar_transport_get(ZEND_STRL("curl"));
multi = factory->multi->init();//调用curl_multi_init()初始化 yar-curl并发调度器
//遍历Yar_Concurrent_Client::_callstack 中的yar_call_data_t 结构体
ZEND_HASH_FOREACH_NUM_KEY_VAL(Z_ARRVAL_P(callstack), sequence, calldata) {
yar_call_data_t *entry;
long flags = 0;
entry = (yar_call_data_t *)zend_fetch_resource(Z_RES_P(calldata), "Yar Call Data", le_calldata);
if (!entry) {
continue;
}
//下面的流程和curl的同步client大同小异
//构造request
if (Z_ISUNDEF(entry->parameters)) {
array_init(&entry->parameters);
}
transport = factory->init();
if (YAR_G(allow_persistent)) {
if (!Z_ISUNDEF(entry->options)) {
zval *flag = php_yar_client_get_opt(&entry->options, YAR_OPT_PERSISTENT);
if (flag && (Z_TYPE_P(flag) == IS_TRUE || (Z_TYPE_P(flag) == IS_LONG && Z_LVAL_P(flag)))) {
flags |= YAR_PROTOCOL_PERSISTENT;
}
}
}
if (!(request = php_yar_request_instance(entry->method,
&entry->parameters, Z_ISUNDEF(entry->options)? NULL: & entry->options))) {
transport->close(transport);
factory->destroy(transport);
return 0;
}
msg = (char*)&entry->options;
//打开并初始化一个libc-CURL句柄
if (!transport->open(transport, entry->uri, flags, &msg)) {
php_yar_client_trigger_error(1, YAR_ERR_TRANSPORT, msg);
transport->close(transport);
factory->destroy(transport);
efree(msg);
return 0;
}
DEBUG_C(ZEND_ULONG_FMT": call api '%s' at (%c)'%s' with '%d' parameters",
request->id, ZSTR_VAL(request->method), (flags & YAR_PROTOCOL_PERSISTENT)? 'p' : 'r', entry->uri,
zend_hash_num_elements(Z_ARRVAL(request->parameters)));
//设置libc-CURL要发送的数据
if (!transport->send(transport, request, &msg)) {
php_yar_client_trigger_error(1, YAR_ERR_TRANSPORT, msg);
transport->close(transport);
factory->destroy(transport);
efree(msg);
return 0;
}
//将entry存到transport实例的data对象中,方便后续使用
transport->calldata(transport, entry);
//注册异步任务
multi->add(multi, transport);
php_yar_request_destroy(request);
} ZEND_HASH_FOREACH_END();
//发包收包 执行回调方法
if (!multi->exec(multi, php_yar_concurrent_client_callback)) {
multi->close(multi);
return 0;
}
//资源释放
multi->close(multi);
return 1;
} /* }}} */
并发调度器
上面的流程大体上和之前介绍的同步调用的流程类似,但是多了一个multi = factory->multi->init();
和该multi
变量的相关调用。
multi相关结构如下:
//yar_transport.h
//并发调度器
typedef struct _yar_transport_multi_interface {
//对象成员变量
void *data;
//对象方法
int (*add)(struct _yar_transport_multi_interface *self, yar_transport_interface_t *cp);
int (*exec)(struct _yar_transport_multi_interface *self, yar_concurrent_client_callback *callback);
void (*close)(struct _yar_transport_multi_interface *self);
} yar_transport_multi_interface_t;
//并发调度器工厂
typedef struct _yar_transport_multi {
struct _yar_transport_multi_interface * (*init)();
} yar_transport_multi_t;
我们称呼上面的那个(yar_transport_multi_interface_t)
类型的变量为为 并发调度器
。
看过传输器章节的结构关系分析,这几个结构理解起来应该也毫无压力。
由于C语言中实现“继承”所用的"父类"的结构和“子类实例”的结构是完全一致的,我们不能像PHP/JAVA一样随意往子类中添加成员变量(其实还是有办法的,参考PHP内核的zend_function相关结构的实现,思路不一样),所以上面的(void *)data
变量并不是对应我们理解的对象中的某个变量,而是所有变量,子类对象通过存放不同类型的指针来“持有"不同数量和不同类型的成员变量。
curl
的并发传输器实例中,其该data
指针的类型为yar_curl_multi_data_t
:
//curl.c
typedef struct _yar_curl_multi_data_t {
CURLM *cm;//libcurl的批处理句柄,也是curl并发调度器的核心
yar_transport_interface_t *chs;//curl传输器实例
} yar_curl_multi_data_t;
另外yar_transport_interface_t
也有一个data
用于同样的用途,curl"子类"实例中其类型为_yar_curl_data_t
:
typedef struct _yar_curl_data_t {
CURL *cp;//libcurl-CURL普通句柄
char persistent;//是否长连接
smart_str buf;.//返回报文存放用的buf
smart_str postfield;//post报文
php_url *host;
yar_call_data_t *calldata;//异步任务
yar_curl_plink_t *plink;//长连接用的
struct curl_slist *headers;//http header头
yar_transport_interface_t *next;//传输器链表
#if LIBCURL_VERSION_NUM < 0x071100
zend_string *address;
#endif
} yar_curl_data_t;
相关数据结构介绍完了,这里看下curl传输器的实现下multl的成员方法.
Init()
yar_transport_curl_multi->init()
代表并发调度器的初始化,即构造器,除了内存分配和成员方法设置,核心就是调用libcurl的curl_multi_init
生成一个libcurl-CURLM句柄
yar_transport_multi_interface_t * php_yar_curl_multi_init() /* {{{ */ {
yar_transport_multi_interface_t *multi = emalloc(sizeof(yar_transport_multi_interface_t));
yar_curl_multi_data_t *data = ecalloc(1, sizeof(yar_curl_multi_data_t));
data->cm = curl_multi_init();
multi->data = data;
multi->add = php_yar_curl_multi_add_handle;
multi->exec = php_yar_curl_multi_exec;
multi->close = php_yar_curl_multi_close;
return multi;
} /* }}} */
Add()
yar_transport_multi_interface_t->add()
用于向异步调度器注册一个异步调用。
int php_yar_curl_multi_add_handle(yar_transport_multi_interface_t *self, yar_transport_interface_t *handle) /* {{{ */ {
yar_curl_multi_data_t *multi = (yar_curl_multi_data_t *)self->data;
yar_curl_data_t *data = (yar_curl_data_t *)handle->data;
//libcurl相关配置
php_yar_curl_prepare(handle);
//往CURLM栈添加CURL句柄
curl_multi_add_handle(multi->cm, data->cp);
//将最后注册的传输器添加到传输器链表表头
if (multi->chs) {
data->next = multi->chs;
multi->chs = handle;
} else {
multi->chs = handle;
}
return 1;
} /* }}} */
Exec()
yar_transport_multi_interface_t->exec()
执行异步调度器,基于select事件模型
,使用curl_multi_perform
收发所有数据。
其实Yar差不多从初版开始就有一个epoll事件模型
实现的异步调度器,不过启用需要自行在config.h
开启ENABLE_EPOLL
宏。
int php_yar_curl_multi_exec(yar_transport_multi_interface_t *self, yar_concurrent_client_callback *f) /* {{{ */ {
int running_count, rest_count;
yar_curl_multi_data_t *multi;
multi = (yar_curl_multi_data_t *)self->data;
while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));
//调用注册的回调函数,Yar回调回调函数有两个时机
//一个在某个请求返回报文接受完毕时,这个是我们一般意义上理解的回调函数
//另一个是Yar的特色处理方式,在一个在所有请求发出后,这时会用空参数回调Yar_Concurrent_Client的回调函数让用户先行处理其他事情。第二种情况对应的就是下面这面这句。
if (!f(NULL, YAR_ERR_OKEY, NULL)) {
goto bailout;
}
//用户回调函数执行后的异常检查
if (EG(exception)) {
goto onerror;
}
//重复调用curl_multi_perform直到所有句柄的数据首发完成
if (running_count) {
rest_count = running_count;
do {
int max_fd, return_code;
struct timeval tv;
fd_set readfds;
fd_set writefds;
fd_set exceptfds;
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_ZERO(&exceptfds);
curl_multi_fdset(multi->cm, &readfds, &writefds, &exceptfds, &max_fd);
if (max_fd == -1) {
/* When max_fd returns with -1, you need to wait a while and then proceed and call
curl_multi_perform anyway, How long to wait? I would suggest 100 milliseconds at least */
tv.tv_sec = 0;
tv.tv_usec = 50000; /* sleep 50ms */
select(1, &readfds, &writefds, &exceptfds, &tv);
while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));
continue;
}
/* maybe we should use curl_multi_timeout like:
* curl_multi_timeout(curlm, (long *)&curl_timeout);
* if (curl_timeout == 0) {
* continue;
* } else if (curl_timeout == -1) {
* tv.tv_sec = (ulong)(YAR_G(timeout) / 1000);
* tv.tv_usec = (ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) & 1000) * 1000 : 0);
* } else {
* tv.tv_sec = curl_timeout / 1000;
* tv.tv_usec = (curl_timeout % 1000) * 1000;
* }
*/
tv.tv_sec = (ulong)(YAR_G(timeout) / 1000);
tv.tv_usec = (ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) & 1000) * 1000 : 0);
return_code = select(max_fd + 1, &readfds, &writefds, &exceptfds, &tv);
if (return_code > 0) {
while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi->cm, &running_count));
} else if (-1 == return_code) {
php_error_docref(NULL, E_WARNING, "select error '%s'", strerror(errno));
goto onerror;
} else {
/* timeout */
php_error_docref(NULL, E_WARNING, "select timeout %ldms reached", YAR_G(timeout));
goto onerror;
}
//每完成任意一个请求就执行一次回调
if (rest_count > running_count) {
int ret = php_yar_curl_multi_parse_response(multi, f);
if (ret == -1) {
goto bailout;
} else if (ret == 0) {
goto onerror;
}
rest_count = running_count;
}
} while (running_count);
} else {
//将各个连接的返回数据分别打包成response
int ret = php_yar_curl_multi_parse_response(multi, f);
if (ret == -1) {
goto bailout;
} else if (ret == 0) {
goto onerror;
}
}
return 1;
onerror:
return 0;
bailout:
self->close(self);
zend_bailout();
return 0;
} /* }}} */
Close()
yar_transport_multi_interface_t->close()
用于各种资源的清理。
void php_yar_curl_multi_close(yar_transport_multi_interface_t *self) /* {{{ */ {
yar_curl_multi_data_t *multi = (yar_curl_multi_data_t *)self->data;
if (multi->chs) {
yar_transport_interface_t *p, *q;
p = multi->chs;
for( ; p;) {
yar_curl_data_t *data = (yar_curl_data_t *)p->data;
q = data->next;
curl_multi_remove_handle(multi->cm, data->cp);
p->close(p);
p = q;
}
}
curl_multi_cleanup(multi->cm);
efree(multi);
efree(self);
return ;
} /* }}} */
数据提取和反序列化
php_yar_curl_multi_parse_response()
用于从收包完成后的CURLM,解析出response并执行回调。
static int php_yar_curl_multi_parse_response(yar_curl_multi_data_t *multi, yar_concurrent_client_callback *f) /* {{{ */ {
int msg_in_sequence;
CURLMsg *msg;
//遍历所有CURL的传输结果
do {
msg = curl_multi_info_read(multi->cm, &msg_in_sequence);
if (msg && msg->msg == CURLMSG_DONE) {
uint found = 0;
yar_transport_interface_t *handle = multi->chs, *q = NULL;
//遍历传输器链表取出对应的传输器
while (handle) {
if (msg->easy_handle == ((yar_curl_data_t*)handle->data)->cp) {
if (q) {
((yar_curl_data_t *)q->data)->next = ((yar_curl_data_t*)handle->data)->next;
} else {
multi->chs = ((yar_curl_data_t*)handle->data)->next;
}
found = 1;
break;
}
q = handle;
handle = ((yar_curl_data_t*)handle->data)->next;
}
if (found) {
long http_code = 200;
yar_response_t *response;
yar_curl_data_t *data = (yar_curl_data_t *)handle->data;
response = php_yar_response_instance();
if (msg->data.result == CURLE_OK) {
curl_multi_remove_handle(multi->cm, data->cp);
//异常返回
if(curl_easy_getinfo(data->cp, CURLINFO_RESPONSE_CODE, &http_code) == CURLE_OK && http_code != 200) {
char buf[128];
uint len = snprintf(buf, sizeof(buf), "server responsed non-200 code '%ld'", http_code);
php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
//调用失败回调函数
if (!f(data->calldata, YAR_ERR_TRANSPORT, response)) {
//用户在回调里使用了die()
handle->close(handle);
php_yar_response_destroy(response);
return -1;
}
if (EG(exception)) {
//异常处理
handle->close(handle);
php_yar_response_destroy(response);
return 0;
}
handle->close(handle);
php_yar_response_destroy(response);
continue;
} else {
//成功返回
if (data->buf.s) {
char *msg = NULL;
zval *retval, rret;
yar_header_t *header;
char *payload;
size_t payload_len;
smart_str_0(&data->buf);
payload = ZSTR_VAL(data->buf.s);
payload_len = ZSTR_LEN(data->buf.s);
//协议头解析
if (!(header = php_yar_protocol_parse(payload))) {
php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header '%.32s'", payload);
} else {
/* skip over the leading header */
payload += sizeof(yar_header_t);
payload_len -= sizeof(yar_header_t);
//去掉协议头后解析成isroe数组
if (!(retval = php_yar_packager_unpack(payload, payload_len, &msg, &rret))) {
php_yar_response_set_error(response, YAR_ERR_PACKAGER, msg, strlen(msg));
} else {
//isroe数组转response结构
php_yar_response_map_retval(response, retval);
DEBUG_C(ZEND_ULONG_FMT": server response content packaged by '%.*s', len '%ld', content '%.32s'", response->id, 7, payload, header->body_len, payload + 8);
zval_ptr_dtor(retval);
}
if (msg) {
efree(msg);
}
}
} else {
php_yar_response_set_error(response, YAR_ERR_EMPTY_RESPONSE, ZEND_STRL("empty response"));
}
//调用对应回调函数
if (!f(data->calldata, response->status, response)) {
handle->close(handle);
php_yar_response_destroy(response);
return -1;
}
if (EG(exception)) {
handle->close(handle);
php_yar_response_destroy(response);
return 0;
}
}
} else {
char *err = (char *)curl_easy_strerror(msg->data.result);
php_yar_response_set_error(response, YAR_ERR_TRANSPORT, err, strlen(err));
if (!f(data->calldata, YAR_ERR_TRANSPORT, response)) {
handle->close(handle);
php_yar_response_destroy(response);
return -1;
}
if (EG(exception)) {
handle->close(handle);
php_yar_response_destroy(response);
return 0;
}
}
handle->close(handle);
php_yar_response_destroy(response);
} else {
php_error_docref(NULL, E_WARNING, "unexpected transport info missed");
}
}
} while (msg_in_sequence);
return 1;
}
/* }}} */
回调用户注册方法
最后一个核心方法php_yar_concurrent_client_callback()
用于回调用户提供的回调函数
nt php_yar_concurrent_client_callback(yar_call_data_t *calldata, int status, yar_response_t *response) /* {{{ */ {
zval code, retval, retval_ptr;
zval callinfo, *callback, *func_params;
zend_bool bailout = 0;
uint params_count, i;
//第一个条件分支对应响应回调,第二个分支对应所有请求都发出后的那次空回调
if (calldata) {
//根据请求结果选择成功/失败回调函数,其中单个请求的回调函数优先于Yar_Concurrent_Client(loop)的回调函数
/* data callback */
if (status == YAR_ERR_OKEY) {
if (!Z_ISUNDEF(calldata->callback)) {
callback = &calldata->callback;
} else {
callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), 0);
}
params_count = 2;
} else {
if (!Z_ISUNDEF(calldata->ecallback)) {
callback = &calldata->ecallback;
} else {
callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_error_callback"), 0);
}
params_count = 3;
}
//没有合适回调直接抛出Error或这打印远程服务方法的返回值
if (Z_ISNULL_P(callback)) {
if (status != YAR_ERR_OKEY) {
if (!Z_ISUNDEF(response->err)) {
php_yar_client_handle_error(0, response);
} else {
php_error_docref(NULL, E_WARNING, "[%d]:unknown Error", status);
}
} else if (!Z_ISUNDEF(response->retval)) {
zend_print_zval(&response->retval, 1);
}
return 1;
}
if (status == YAR_ERR_OKEY) {
if (Z_ISUNDEF(response->retval)) {
php_yar_client_trigger_error(0, YAR_ERR_REQUEST, "%s", "server responsed empty response");
return 1;
}
ZVAL_COPY(&retval, &response->retval);
} else {
ZVAL_LONG(&code, status);
ZVAL_COPY(&retval, &response->err);
}
array_init(&callinfo);
//回调函数的最后一个参数$callinfo,包含请求ID,uri,和远程方法名
add_assoc_long_ex(&callinfo, "sequence", sizeof("sequence") - 1, calldata->sequence);
add_assoc_str_ex(&callinfo, "uri", sizeof("uri") - 1, zend_string_copy(calldata->uri));
add_assoc_str_ex(&callinfo, "method", sizeof("method") - 1, zend_string_copy(calldata->method));
} else {
callback = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), 0);
if (Z_ISNULL_P(callback)) {
return 1;
}
params_count = 2;
}
if (calldata && (status != YAR_ERR_OKEY)) {
//失败回调方法接受三个参数 function($code,$retval,$callinfo)){}
func_params = safe_emalloc(sizeof(zval), 3, 0);
ZVAL_COPY_VALUE(&func_params[0], &code);
ZVAL_COPY_VALUE(&func_params[1], &retval);
ZVAL_COPY_VALUE(&func_params[2], &callinfo);
} else if (calldata) {
//成功的回调方法接受2个参数 function($retval,$callinfo)){}
func_params = safe_emalloc(sizeof(zval), 2, 0);
ZVAL_COPY_VALUE(&func_params[0], &retval);
ZVAL_COPY_VALUE(&func_params[1], &callinfo);
} else {
//所有请求都发出后的首次回调参数都是空
func_params = safe_emalloc(sizeof(zval), 2, 0);
ZVAL_NULL(&func_params[0]);
ZVAL_NULL(&func_params[1]);
}
//调用回调方法,清理相关资源
zend_try {
if (call_user_function_ex(EG(function_table), NULL, callback,
&retval_ptr, params_count, func_params, 0, NULL) != SUCCESS) {
for (i = 0; i < params_count; i++) {
zval_ptr_dtor(&func_params[i]);
}
efree(func_params);
if (calldata) {
php_error_docref(NULL, E_WARNING, "call to callback failed for request: '%s'", ZSTR_VAL(calldata->method));
} else {
php_error_docref(NULL, E_WARNING, "call to initial callback failed");
}
return 1;
}
} zend_catch {
bailout = 1;
} zend_end_try();
if (!Z_ISUNDEF(retval_ptr)) {
zval_ptr_dtor(&retval_ptr);
}
for (i = 0; i < params_count; i++) {
zval_ptr_dtor(&func_params[i]);
}
efree(func_params);
return bailout? 0 : 1;
} /* }}} */