Swoole深入学习-task进程异步任务实现
在前面我们已经了解了swoole的进程模型,同时也了解了同步异步相关的概念,那么本节课我们学习下异步任务的实现。
Task进程是独立与worker进程的一个进程.他主要处理耗时较长的业务逻辑.并且不影响worker进程处理客户端的请求,这大大提高了swoole的并发能力当有耗时较长的任务时,worker进程通过task()函数把数据投递到Task进程去处理
回顾一下:
在worker进程当中我们可以执行客户端提交过来的数据执行相应的业务逻辑,也就是在触发的事件当中,比如接收客户端提交的数据,处理同步的业务,这是在worker进程当中执行,那什么时候需要task进程呢?
情景一:管理员需要给100W用户发送邮件,当点击发送,浏览器会一直转圈,直到邮件全部发送完毕。
情景二:千万微博大V发送一条微博,其关注的粉丝相应的会接收到这个消息,是不是大V需要一直等待消息发送完成,才能执行其它操作
从我们理解的角度思考,这其实都是php进程一直被阻塞,客户端才一直在等待服务端的响应,我们的代码就是同步执行的。
对于用户而言,这就是漫长的等待。如何优雅的提高用户体验就是一个非常棘手的问题。
那么task的目的就是这个,下面我们来介绍下task的使用。
1、worker进程到中,我们调用对应的task()方法发送数据通知到task worker进程
2、task worker进程会在onTask()回调中接收到这些数据,并进行处理。
3、处理完成之后通过调用finsh()函数或者直接return返回消息给worker进程
4、worker进程在onFinsh()进程收到这些消息并进行处理
代码如下:
class taskServer
{
private $serv;
public function __construct() {
$this->serv = new Swoole\Server('0.0.0.0', 9501);
//初始化swoole服务
$this->serv->set(array(
'worker_num' => 8,
'task_worker_num' => 8 //异步任务进程
));
//设置监听
$this->serv->on('Connect', array($this, 'onConnect'));
$this->serv->on("Receive", array($this, 'onReceive'));
$this->serv->on("Close", array($this, 'onClose'));
$this->serv->on("Task", array($this, 'onTask'));
$this->serv->on("Finish", array($this, 'onFinish'));
//开启
$this->serv->start();
}
public function onConnect($serv, $fd) {
}
public function onReceive($serv, $fd, $from_id, $data) {
echo "接收到来自客户端的消息{$fd}:{$data}\n";
$param = array(
'fd' => $fd
);
//投递任务到task进程当中
$serv->task(json_encode($param));
echo "我能不能先执行\n";
}
public function onClose($serv, $fd) {
}
public function onTask($serv, $task_id, $from_id, $data) {
echo "消息任务:{$task_id}来自worker:{$from_id}\n";
echo "数据: {$data}\n";
for($i = 0 ; $i 20 ; $i ++ ) {
sleep(1);
echo "task:{$task_id}-处理中{$i} \n";
}
$fd = json_decode($data, true);
//可以直接通知客户端,看业务逻辑需求
$serv->send($fd['fd'] , "task数据处理完成{$task_id}");
//返回到worker进程
return "task任务处理完成";
}
public function onFinish($serv,$task_id, $data) {
echo "Task:{$task_id}-处理完成 \n";
}
}
$server = new taskServer();
注意事项:
1、开启task功能
task功能默认是关闭的,开启task功能需要满足两个条件
1.配置task进程的数量,即配置task_worker_num这个配置项
2.注册task的回调函数onTask和onFinish
2、task怎么使用?
Task进程其实是要在worker进程内发起的,即我们把需要投递的任务,通过worker进程投递到task进程中去处理。
怎么操作呢?我们可以利用swoole_server->task函数把任务数据投递到task进程池中。
swoole_server->task函数是非阻塞函数,任务投递到task进程中后会立即返回,即不管任务需要在task进程内处理多久,worker进程也不需要任何的等待,不会影响到worker进程的其他操作。但是task进程却是阻塞的,如果当前task进程都处于繁忙状态即都在处理任务,你又投递过来更多任务,这个时候新投递的任务就只能乖乖的排队等task进程空闲才能继续处理。
如果投递的任务量总是大于task进程的处理能力,建议适当的调大task_worker_num的数量,增加task进程数,不然一旦task塞满缓冲区,就会导致worker进程阻塞,我们不期望的结果。
3、Task常见问题:
Task传递数据的大小
数据小于8k直接通过管道传递,数据大于8k写入临时文件传递
onTask会读取这个文件,把他读出来
运行Task,必须要在swoole服务中配置参数task_worker_num,此外,必须给swoole_server绑定两个回调函数:onTask和onFinish。
onTask要return 数据
Task传递对象
默认task只能传递数据,可以通过序列化传递一个对象的拷贝,Task中对象的改变并不会反映到worker进程中数据库连接,网络连接对象不可传递,会引起php报错
Task的onFinish回调
Task的onFinish回调会回调调用task方法的worker进程
四、客户端测试
客户端请求:
客户端代码利用之前的client代码就可以
服务端运行结果:
看箭头指示我们发现最终确实在屏幕上输出了,worker进程当中写的(task处理完成),并且是异步执行最开始就输出了(我能不能先执行)。
热门文章