RabbitMQ+WebSocket实现大屏幕消息推送

作者:八重樱

来源:www.cnblogs.com/a609251438/p/12713467.html


介绍

基于 Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。


思路

利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,

保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。


WebSocket 服务


composer require hyperf/websocket-server


配置文件 [config/autoload/server.php]

<?php
return [
'mode' => SWOOLE_PROCESS,
'servers' => [
        [
'name' => 'http',
'type' => Server::SERVER_HTTP,
'host' => '0.0.0.0',
'port' => 11111,
'sock_type' => SWOOLE_SOCK_TCP,
'callbacks' => [
                SwooleEvent::ON_REQUEST => [HyperfHttpServerServer::class, 'onRequest'],
            ],
        ],
        [
'name' => 'ws',
'type' => Server::SERVER_WEBSOCKET,
'host' => '0.0.0.0',
'port' => 12222,
'sock_type' => SWOOLE_SOCK_TCP,
'callbacks' => [
                SwooleEvent::ON_HAND_SHAKE => [HyperfWebSocketServerServer::class, 'onHandShake'],
                SwooleEvent::ON_MESSAGE => [HyperfWebSocketServerServer::class, 'onMessage'],
                SwooleEvent::ON_CLOSE => [HyperfWebSocketServerServer::class, 'onClose'],
            ],
        ],
    ],


WebSocket 服务器端代码示例

<?php
declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://doc.hyperf.io
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
 */

namespace AppController;
use HyperfContractOnCloseInterface;
use HyperfContractOnMessageInterface;
use HyperfContractOnOpenInterface;
use SwooleHttpRequest;
use SwooleServer;
use SwooleWebsocketFrame;
use SwooleWebSocketServer as WebSocketServer;
class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
/**
     * 发送消息
     * @param WebSocketServer $server
     * @param Frame $frame
     */

public function onMessage(WebSocketServer $server, Frame $frame): void
{
//心跳刷新缓存
        $redis = $this->container->get(Redis::class);
//获取所有的客户端id
        $fdList = $redis->sMembers('websocket_sjd_1');
//如果当前客户端在客户端集合中,就刷新
if (in_array($frame->fd, $fdList)) {
            $redis->sAdd('websocket_sjd_1', $frame->fd);
            $redis->expire('websocket_sjd_1', 7200);
        }
        $server->push($frame->fd, 'Recv: ' . $frame->data);
    }
/**
     * 客户端失去链接
     * @param Server $server
     * @param int $fd
     * @param int $reactorId
     */

public function onClose(Server $server, int $fd, int $reactorId): void
{
//删掉客户端id
        $redis = $this->container->get(Redis::class);
//移除集合中指定的value
        $redis->sRem('websocket_sjd_1', $fd);
        var_dump('closed');
    }
/**
     * 客户端链接
     * @param WebSocketServer $server
     * @param Request $request
     */

public function onOpen(WebSocketServer $server, Request $request): void
{
//保存客户端id
        $redis = $this->container->get(Redis::class);
        $res1 = $redis->sAdd('websocket_sjd_1', $request->fd);
        var_dump($res1);
        $res = $redis->expire('websocket_sjd_1', 7200);
        var_dump($res);
        $server->push($request->fd, 'Opened');
    }
}



WebSocket 前端代码

function WebSocketTest() {
if ("WebSocket" in window) {
console.log("您的浏览器支持 WebSocket!");
var num = 0
// 打开一个 web socket
var ws = new WebSocket("ws://127.0.0.1:12222");
        ws.onopen = function () {
// Web Socket 已连接上,使用 send() 方法发送数据
//alert("数据发送中...");
//ws.send("发送数据");
        };
window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开
var ping = {"type": "ping"};
            ws.send(JSON.stringify(ping));
        }, 5000);
       ws.onmessage = function (evt) {
var d = JSON.parse(evt.data);
console.log(d);
if (d.code == 300) {
                $(".address").text(d.address)
            }
if (d.code == 200) {
var v = d.data
console.log(v);
                num++
var str = `<div class="item">
                                <p>${v.recordOutTime}</p>
                               <p>${v.userOutName}</p>
                               <p>${v.userOutNum}</p>
                               <p>${v.doorOutName}</p>
                            </div>`

                $(".tableHead").after(str)
if (num > 7) {
                   num--
                    $(".table .item:nth-last-child(1)").remove()
                }
            }
        };
        ws.error = function (e) {
console.log(e)
            alert(e)
        }
        ws.onclose = function () {
// 关闭 websocket
            alert("连接已关闭...");
        };
    } else {
        alert("您的浏览器不支持 WebSocket!");
    }
}


AMQP 组件

composer require hyperf/amqp


配置文件 [config/autoload/amqp.php]

<?php
return [
'default' => [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
        ],
'params' => [
'insist' => false,
'login_method' => 'AMQPLAIN',
'login_response' => null,
'locale' => 'en_US',
'connection_timeout' => 3.0,
'read_write_timeout' => 6.0,
'context' => null,
'keepalive' => false,
'heartbeat' => 3,
        ],
    ],
];
MQ 消费者代码
<?php
declare(strict_types=1);
namespace AppAmqpConsumer;
use HyperfAmqpAnnotationConsumer;
use HyperfAmqpMessageConsumerMessage;
use HyperfAmqpResult;
use HyperfServerServer;
use HyperfServerServerFactory;
/**
 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
 */

class DemoConsumer extends ConsumerMessage
{
/**
     * rabbmitMQ消费端代码
     * @param $data
     * @return string
     */

public function consume($data): string
{
        print_r($data);
//获取集合中所有的value
        $redis = $this->container->get(Redis::class);
        $fdList=$redis->sMembers('websocket_sjd_1');
        $server=$this->container->get(ServerFactory::class)->getServer()->getServer();
foreach($fdList as $key=>$v){
if(!empty($v)){
                $server->push((int)$v, $data);
            }
        }
return Result::ACK;
    }
}


控制器代码

/**
 * test
 * @return array
 */

public function test()
{
    $data = array(
'code' => 200,
'data' => [
'userOutName' => 'ccflow',
'userOutNum' => '9999',
'recordOutTime' => date("Y-m-d H:i:s", time()),
'doorOutName' => '教师公寓',
        ]
    );
    $data = GuzzleHttpjson_encode($data);
    $message = new DemoProducer($data);
    $producer = ApplicationContext::getContainer()->get(Producer::class);
    $result = $producer->produce($message);
    var_dump($result);
    $user = $this->request->input('user', 'Hyperf');
    $method = $this->request->getMethod();
return [
'method' => $method,
'message' => "{$user}.",
    ];
}


最终效果

RabbitMQ+WebSocket实现大屏幕消息推送


更多精彩:

SpringSecurity + JWT 权限系统

非常强悍的 RabbitMQ 总结,写得真好!

这个IDEA插件,专门解决Maven依赖冲突

还在用Swagger(丝袜哥)生成接口文档?我推荐你试试它…

关注公众号,查看更多优质文章

RabbitMQ+WebSocket实现大屏幕消息推送


有读者问我公众号有没有Java相关的学习资料,我整理了很多Java学习资料和视频放在公众号后台了。

获取方式:关注公众号并回复 Java 领取,更多内容陆续奉上。


明天见(。・ω・。)ノ♡

原创文章,作者:栈长,如若转载,请注明出处:https://www.cxyquan.com/8038.html

发表评论

登录后才能评论

联系我们

400-800-8888

在线咨询:点击这里给我发消息

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息