php消息队列
大雷编程 人气:0常见进程通信方式
System V IPC 总的包括:消息队列,共享内存、信号量。
IPC(内部进程间通信)的使用注意:
- IPC资源仅在本机中使用,不能够跨网络使用(其实进程间通信方式除了socket 通信方式其他都是仅在本机中使用)。
- IPC的资源生存周期与内核相同。除非删除,不然会与系统的生存周期相同。(也就是说如果你不主动删除创建的ipc资源,那么它会一直存在,除非系统关机,它才会被清除)
- (重要)每个IPC都有一个关键字key。每个IPC资源都有唯一的整型标识符,进程可以使用id对此资源进行访问。
系统V IPC消息队列
消息队列实际上是一个队列,由内核维护,使用msgget (msgget() 函数是linux系统调用函数,由c语言编写) 函数来创建一个消息队列,创建成功返回队列 ID
在php 中通过封装 system V IPC
函数来实现操作消息队列,共享内存,与信号量。我们可以通过php官方文档上查看这些函数的定义,其中msg_
为首的函数是用来操作消息队列,sem_
为首的函数是对信号量操作,shm_
为首的函数是对共享内存的操作
其实 php 这些进程扩展说到底就是对 linux c 系统调用函数与c标准函数库的封装,然后在加一点php语言自己的处理,封装成新的函数供 广大 phper 调用,只要在linux 下运行 不管你是 GO, JAVA ,python 其实都是对系统调用函数的封装,只是封装后的名字不同,但是调用的都是linux 内核提供的同一套接口
php创建一个消息队列
<?php $key = ftok('demo1.php',"x");//将文件与ID转换为一个key,这个文件只要真实存在就行,没有特殊限制,一般都指定当前文件名 $msqid = msg_get_queue($key);// 创建一个消息队列,返回一个资源id echo msg_send($msqid,1,"hello"); //往消息队列写入一条数据 hello 到队列中 echo $msqid; // 输出资源id
如何查看创建的消息队列呢?
我们可用通过linux 提供的 ipcs
命令查看
如图,第一列 Message Queues
代表消息队列
key | msqid | owner | perms | used-bytes | messages |
---|---|---|---|---|---|
0x7801620f | 5 | root | 666 | 12 | 1 |
外部标识由调用函数时传入 | 消息队列标识id(用于进程内部通讯)每个队列都有唯一标识,用于区分不同消息队列 | 创建消息队列用户 | 操作权限位 | 消息队列使用了多少字节 | 消息队列有多少条消息 |
通过表格分析我们刚刚创建的消息队列是一个,消息标识符为5
创建用户为root ,操作权限为666,写入了一条数据,占用了12个字节的队列。
看到这,大家可能发现两个问题?
第一个问题:
为什么我调用 php 函数 msg_get_queue()
创建消息队列返回的不是消息队列标识符5
,而是 Resource id #4
因为之前说过 php 函数其实是对linux c 系统调用函数的封装,也就是说 msg_get_queue
函数 其实是对 msgget
系统调用函数的封装,调用msgget
函数的返回内容应该是 消息队列标识id,而现在封装 msgget
函数的 msg_get_queue
函数返回的却是资源描述符,这是为什么?
这其实就是php内部做了处理,返回资源描述符,使得 msg_get_queue
可以配合其他php
函数一起使用,但内部调用msgget
函数返回的一定是 5
,也就是消息队列标识符
如果不相信可使用 linux 下提供的 strace 工具
跟踪系统调用。
第二个问题
通过php函数 msg_send
写入了一条数据,数据内容为 ”hello“
字节长度应该是5个字节,但是消息队列占用的字节长度缺少12个字节?
这是因为php 内部实现对写入数据做了序列化操作,导致写入消息队列的字节长度为12而不是5个字节长度
<?php $a = serialize("hello"); echo 'hello 序列化长度:'.strlen($a)."\n"; echo 'hello 序列化内容:'.$a."\n"; echo "hello 未序列化长度:".strlen("hello")."\n";
当然如果你不想 msg_send
函数写入队列前对数据进行序列化,可以把第3个参数设置为false,不过需要注意的是,如果写入数据不进行序列化,那么使用 msg_receive
函数读取队列数据时也必须设置为不反序列化操作,不然会引发错误
读取队列内容
<?php $key = ftok('demo1.php',"x");//将文件与ID转换为一个key $msqid = msg_get_queue($key);//消息队列如果已创建,直接返回同一个资源描述符 //第一个参数资源描述符,第二个参数一般为0,表示从队列第一条读取,或者设置与msg_send函数第二个参数一致,第三个参数设置缓冲区长度,注意msg_send 函数默认会序列化数据,msg_receive函数默认会将读取数据反序列化,如果缓冲区设置过小会反序列化失败 msg_receive($msqid,1,$received_message_type,1024,$message);//读取消息队列数据,逐条读取, echo $message.PHP_EOL;
成功从消息队列中读取出一条数据,内容hello
因为消息队列内容被读取,消息队列占用字节被清空为0,消息队列消息条数也为0
通过消息队列的读写操作我们发现,他与上一篇的管道有什么区别?
消息队列不像管道通信写入数据,必须读端要打开,读数据,写端必须要打开,不然会阻塞无法通信,消息队列可以随时往队列写入数据,读取数据
关闭序列化功能
写端
<?php $key = ftok('demo3.php',"x");//将文件与ID转换为一个key,这个文件只要真实存在就行,没有特殊限制,一般都指定当前文件名 $msqid = msg_get_queue($key);// 创建一个消息队列,返回一个资源id // 第一个参数资源id,第二个参数消息类型,必须大于0,第三个参数写入队列内容,取消序列化操作 echo msg_send($msqid,1,"hello",false); //往消息队列写入一条数据 hello 到队列中
因为没有对写入数据进行序列化操作,消息队列占用的字节长度变成了5个字节。
读端
<?php $key = ftok('demo3.php',"x");//将文件与ID转换为一个key $msqid = msg_get_queue($key); //第一个参数资源描述符 //第二个参数一般为0,表示从队列第一条读取,或者设置与msg_send函数第二个参数一致 // 第三个参数是msg_send函数的第二个参数的值,也就是发送消息的消息类型 //第四个参数设置接收缓冲区长度,如果设置过小会反序列化失败【如果使用序列化功能时候】 //当接收缓冲区大小设置过小并且关闭了反序列化【发送端发送数据也关闭了】,但是发送的数据超过了设置的长度,就会被截断,并且额外的数据会被丢弃,必须设置第五个参数才有效 // 第五个参数 flags 标志符 msg_receive($msqid,0,$received_message_type,1,$message,false,MSG_NOERROR); echo $message.PHP_EOL;
只读取到一个h
是因为接受缓冲区长度设置为1个字节,所以只接收到一个字节,其它部分被丢弃。
非阻塞读取消息队列
当队列内没有数据,读取队列msg_receive
函数是会阻塞的如何让队列无数据读端也不阻塞继续往下执行,可以通过 设置 flags 标志符 为 MSG_IPC_NOWAIT
,该标志符作用是,不管队列有无数据立即返回结果
<?php $key = ftok('demo5.php',"x");//将文件与ID转换为一个key $msqid = msg_get_queue($key); // fork 一个子进程 $pid = pcntl_fork(); if($pid == 0){ while(1){ // MSG_IPC_NOWAIT【启用非阻塞】 msg_receive 函数其实是封装 linux c msgrcv 系统调用函数,如果设置为非阻塞,函数调用次数非常高,会非常消耗cpu资源 //阻塞模式调用函数次数低,有数据才返回,对cpu友好 $ret = msg_receive($msqid,0,$msgType,1024,$msg,true,MSG_IPC_NOWAIT,$error_code); if($error_code != MSG_ENOMSG){ echo $msg."\n"; } sleep(2); echo "go"."\n"; } exit(0); }
父子进程消息队列通信
<?php $key = ftok('demo5.php',"x");//将文件与ID转换为一个key $msqid = msg_get_queue($key); $pid = pcntl_fork(); if($pid == 0){ while(1){ $ret = msg_receive($msqid,0,$msgType,1024,$msg,true,MSG_IPC_NOWAIT,$error_code); if($error_code != MSG_ENOMSG){ echo $msg."\n"; } } exit(0); } $i = 1; while(1){ msg_send($msqid,2,"hello world",true); sleep(2); if($i++ == 3){ // 杀死子进程 posix_kill($pid,SIGILL); break; } } //子进程退出回收子进程,防止孤儿进程 $pid = pcntl_wait($status); if($pid > 0) { echo "exit pid=".$pid."\n"; }
删除消息队列
<?php $key = ftok('demo5.php',"x");//将文件与ID转换为一个key $msqid = msg_get_queue($key); $pid = pcntl_fork(); if($pid == 0){ while(1){ $ret = msg_receive($msqid,0,$msgType,1024,$msg,true,MSG_IPC_NOWAIT,$error_code); if($error_code != MSG_ENOMSG){ echo $msg."\n"; } } exit(0); } $i = 1; while(1){ msg_send($msqid,2,"hello world",true); sleep(2); if($i++ == 3){ // 杀死子进程 posix_kill($pid,SIGILL); break; } } //子进程退出回收子进程,防止孤儿进程 $pid = pcntl_wait($status); if($pid > 0) { echo "exit pid=".$pid."\n"; } //msg_remove_queue 函数移除消息队列 if(msg_remove_queue($msqid)){ echo "remove ok\n"; }
读取3次队列数据后父进程杀死子进程,父进程回收子进程,然后父进程退出,最后销毁消息队列
删除消息队列也可以直接使用命令 ipcrm -q 7
加载全部内容