当前位置:首页 > Web开发 > 正文

PHP动静行列队伍实现及应用

2024-03-31 Web开发

动静行列队伍的观点,道理和场景

解耦案例: 行列队伍措置惩罚惩罚订单系统和配送系统

流量削峰案例: Redis的List类型实现秒杀

RabbitMQ: 更专业的动静系统实现方案

 

一.动静行列队伍

动静行列队伍观点

行列队伍布局中间件

动静放入后,不需要当即措置惩罚惩罚

由订阅者/消费者按挨次措置惩罚惩罚

核心布局

业务系统--入队-->动静行列队伍--出队-->行列队伍措置惩罚惩罚系统

应用场景

冗余: 数据需要冗余时,如订单系统后续需要严格的转换和记录,动静行列队伍将这些数据长期化的存储在行列队伍中,动静措置惩罚惩罚措施获打动静,并将该笔记录删除

解耦: 解决了两套系统深度耦合问题,入队系统和出队系统无关

流量削峰: 秒杀和抢购,呈现非常明显的流量剧增,大量的需求集中在几秒钟之内,对处事器的瞬间压力非常大

异步通信: 动静自己就可以使入队的请求直接返回,所以事先了措施的异步操纵

扩展性: 如订单,订单入队后会有财务系统进行措置惩罚惩罚,后期如果添加配货系统,配货系统直接订阅该动静行列队伍即可

排序保证: 有些场景下数据的措置惩罚惩罚挨次非常重要,行列队伍可做成单线程单进单出系统,保证数据凭据挨次措置惩罚惩罚

常见行列队伍实现优错误谬误

Mysql: 可靠性高,易实现,速度慢

Redis: 速度快,单条大动静包时效率低

动静系统: 专业性强,可靠,学习本钱高

动静措置惩罚惩罚的触发机制

死循环方法读取: 易实现,故障时无法及时恢复(时效性强,适合秒杀)

按时任务: 压力均分,有措置惩罚惩罚量上限(按时任务的间隔和数量需要把控,适合订单系统,物流配货系统)

守护进程: 类似于PHP-FPM和PHP-CG,需要shell根本(监听进程检测动静行列队伍中是否有内容,有内容启用出队系统进行措置惩罚惩罚)

 

二.解耦案例:行列队伍措置惩罚惩罚订单系统和配送系统

架构设计

订单系统(接收用户订单)--->

订单行列队伍表--->

配送系统中按时执行的措施读取行列队伍表,将已措置惩罚惩罚的记录进行符号

措施流程

接收用户订单(Order.php)--->

订单系统措置惩罚惩罚--->

行列队伍表(包罗字符:order_id,status,mobile,address,created_at,updated_at)--->

(按时脚本goods.sh每分钟启动)配送措置惩罚惩罚系统(Goods.php)--->

配送系统措置惩罚惩罚

代码实现

1 -- 表布局 2 create table `order_queue`( 3 `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT id号, 4 `order_id` int(11) NOT NULL, 5 `mobile` varchar(20) NOT NULL COMMENT 用户手机号, 6 `address` varchar(100) NOT NULL COMMENT 用户地点, 7 `created_at` datetime NOT NULL DEFAULT 0000-00-00 00:00:00 COMMENT 订单创建的时间, 8 `updated_at` datetime NOT NULL DEFAULT 0000-00-00 00:00:00 COMMENT 措置惩罚惩罚完成的时间, 9 `status` tinyint(2) NOT NULL COMMENT 当前状态,0未措置惩罚惩罚,1已措置惩罚惩罚,2措置惩罚惩罚中, 10 PRIMARY key (`id`) 11 )ENGINE=InnoDB DEFAULT CHARSET=utf8;

1 //db.php 2 <?php 3 // 数据库连接类 4 class DB{ 5 //私有的属性 6 private static $dbcon=false; 7 private $host; 8 private $port; 9 private $user; 10 private $pass; 11 private $db; 12 private $charset; 13 private $link; 14 //私有的结构要领 15 private function __construct(){ 16 $this->host = ‘localhost‘; 17 $this->port = ‘3306‘; 18 $this->user = ‘root‘; 19 $this->pass = ‘root‘; 20 $this->db = ‘imooc‘; 21 $this->charset= ‘utf8‘; 22 //连接数据库 23 $this->db_connect(); 24 //选择数据库 25 $this->db_usedb(); 26 //设置字符集 27 $this->db_charset(); 28 } 29 //连接数据库 30 private function db_connect(){ 31 $this->link=mysqli_connect($this->host.‘:‘.$this->port,$this->user,$this->pass); 32 if(!$this->link){ 33 echo "数据库连接掉败<br>"; 34 echo "错误编码".mysqli_errno($this->link)."<br>"; 35 echo "错误信息".mysqli_error($this->link)."<br>"; 36 exit; 37 } 38 } 39 //设置字符集 40 private function db_charset(){ 41 mysqli_query($this->link,"set names {$this->charset}"); 42 } 43 //选择数据库 44 private function db_usedb(){ 45 mysqli_query($this->link,"use {$this->db}"); 46 } 47 //私有的克隆(实现单例) 48 private function __clone(){ 49 die(‘clone is not allowed‘); 50 } 51 //公用的静态要领(将该要领作为入口) 52 public static function getIntance(){ 53 if(self::$dbcon==false){ 54 self::$dbcon=new self; 55 } 56 return self::$dbcon; 57 } 58 //执行sql语句的要领 59 public function query($sql){ 60 $res=mysqli_query($this->link,$sql); 61 if(!$res){ 62 echo "sql语句执行掉败<br>"; 63 echo "错误编码是".mysqli_errno($this->link)."<br>"; 64 echo "错误信息是".mysqli_error($this->link)."<br>"; 65 } 66 return $res; 67 } 68 //获得最后一笔记录id 69 public function getInsertid(){ 70 return mysqli_insert_id($this->link); 71 } 72 /** 73 * 盘问某个字段 74 * @param 75 * @return string or int 76 */ 77 public function getOne($sql){ 78 $query=$this->query($sql); 79 return mysqli_free_result($query); 80 } 81 //获取一行记录,return array 一维数组 82 public function getRow($sql,$type="assoc"){ 83 $query=$this->query($sql); 84 if(!in_array($type,array("assoc",‘array‘,"row"))){ 85 die("mysqli_query error"); 86 } 87 $funcname="mysqli_fetch_".$type; 88 return $funcname($query); 89 } 90 //获取一笔记录,前置条件通过资源获取一笔记录 91 public function getFormSource($query,$type="assoc"){ 92 if(!in_array($type,array("assoc","array","row"))) 93 { 94 die("mysqli_query error"); 95 } 96 $funcname="mysqli_fetch_".$type; 97 return $funcname($query); 98 } 99 //获取多条数据,,二维数组 100 public function getAll($sql){ 101 $query=$this->query($sql); 102 $list=array(); 103 while ($r=$this->getFormSource($query)) { 104 $list[]=$r; 105 } 106 return $list; 107 } 108 109 public function selectAll($table,$where,$fields=‘*‘,$order=‘‘,$skip=0,$limit=1000) 110 { 111 if(is_array($where)){ 112 foreach ($where as $key => $val) { 113 if (is_numeric($val)) { 114 $condition = $key.‘=‘.$val; 115 }else{ 116 $condition = $key.‘=\"‘.$val.‘\"‘; 117 } 118 } 119 } else { 120 $condition = $where; 121 } 122 if (!empty($order)) { 123 $order = " order by ".$order; 124 } 125 $sql = "select $fields from $table where $condition $order limit $skip,$limit"; 126 $query = $this->query($sql); 127 $list = array(); 128 while ($r= $this->getFormSource($query)) { 129 $list[] = $r; 130 } 131 return $list; 132 } 133 /** 134 * 界说添加数据的要领 135 * @param string $table 表名 136 * @param string orarray $data [数据] 137 * @return int 最新添加的id 138 */ 139 public function insert($table,$data){ 140 //遍历数组,得到每一个字段和字段的值 141 $key_str=‘‘; 142 $v_str=‘‘; 143 foreach($data as $key=>$v){ 144 // if(empty($v)){ 145 // die("error"); 146 // } 147 //$key的值是每一个字段s一个字段所对应的值 148 $key_str.=$key.‘,‘; 149 $v_str.="‘$v‘,"; 150 } 151 $key_str=trim($key_str,‘,‘); 152 $v_str=trim($v_str,‘,‘); 153 //判断数据是否为空 154 $sql="insert into $table ($key_str) values ($v_str)"; 155 $this->query($sql); 156 //返回上一次增加操做孕育产生ID值 157 return $this->getInsertid(); 158 } 159 /* 160 * 删除一条数据要领 161 * @param1 $table, $where=array(‘id‘=>‘1‘) 表名 条件 162 * @return 受影响的行数 163 */ 164 public function deleteOne($table, $where){ 165 if(is_array($where)){ 166 foreach ($where as $key => $val) { 167 $condition = $key.‘=‘.$val; 168 } 169 } else { 170 $condition = $where; 171 } 172 $sql = "delete from $table where $condition"; 173 $this->query($sql); 174 //返回受影响的行数 175 return mysqli_affected_rows($this->link); 176 } 177 /* 178 * 删除多条数据要领 179 * @param1 $table, $where 表名 条件 180 * @return 受影响的行数 181 */ 182 public function deleteAll($table, $where){ 183 if(is_array($where)){ 184 foreach ($where as $key => $val) { 185 if(is_array($val)){ 186 $condition = $key.‘ in (‘.implode(‘,‘, $val) .‘)‘; 187 } else { 188 $condition = $key. ‘=‘ .$val; 189 } 190 } 191 } else { 192 $condition = $where; 193 } 194 $sql = "delete from $table where $condition"; 195 $this->query($sql); 196 //返回受影响的行数 197 return mysqli_affected_rows($this->link); 198 } 199 /** 200 * [改削操纵description] 201 * @param [type] $table [表名] 202 * @param [type] $data [数据] 203 * @param [type] $where [条件] 204 * @return [type] 205 */ 206 public function update($table,$data,$where,$limit=0){ 207 //遍历数组,得到每一个字段和字段的值 208 $str=‘‘; 209 foreach($data as $key=>$v){ 210 $str.="$key=‘$v‘,"; 211 } 212 $str=rtrim($str,‘,‘); 213 if(is_array($where)){ 214 foreach ($where as $key => $val) { 215 if(is_array($val)){ 216 $condition = $key.‘ in (‘.implode(‘,‘, $val) .‘)‘; 217 } else { 218 $condition = $key. ‘=‘ .$val; 219 } 220 } 221 } else { 222 $condition = $where; 223 } 224 225 if (!empty($limit)) { 226 $limit = " limit ".$limit; 227 }else{ 228 $limit=‘‘; 229 } 230 //改削SQL语句 231 $sql="update $table set $str where $condition $limit"; 232 $this->query($sql); 233 //返回受影响的行数 234 return mysqli_affected_rows($this->link); 235 } 236 }

温馨提示: 本文由Jm博客推荐,转载请保留链接: https://www.jmwww.net/file/web/33270.html