基于socket.io的实时消息推送

用户访问Web站点的过程是基于HTTP协议的,而HTTP协议的工作模式是:请求-响应,客户端发出访问请求,服务器端以资源数据响应请求。 也就是说,服务器端始终是被动的,即使服务器端的资源数据发生变化,如果没有来自客户端的请求,用户就不会看到这些变化。 这种模式是不适合某些应用场景的,比如在社交网络用户需要近乎实时地知道其他用户最新的信息。对于普通站点来说, 请求-响应模式可以满足绝大多数的功能需求,但总有某些功能我们希望能够为用户提供实时消息的体验。

为解决这个问题,有两种方案可以选择:

  1. 仍旧使用请求-响应模式,只是增大请求的频率或者使用长连接,来达到尽可能接近实时的效果,如使用polling/long-polling,但可能会极大地增加服务器的负载压力或降低服务器的吞吐量
  2. 使用新的协议,在服务器端有资源数据更新时,主动推送给客户端,如WebSocket,虽然这种思路也是使用了长连接,但效率更高,且是客户端服务器端之间的全双工通信。 问题在于目前各大浏览器并不都支持WebSocket。

那么目前最好的方式就是结合以上两种方案,在不同的浏览器中,尽可能使用浏览器支持的最好的方案,即浏览器支持第二种方案时,优先使用第二种方案,否则使用第一种方案。socket.io就是这么做的,并且在服务器端和客户端对于不同的方案提供统一的接口。


在我们产品的站内信功能中,希望能够给在线用户实时推送公共消息或私有消息。考虑到以后可能还有其他功能需要实现实时消息推送,所以将实时消息推送实现为一个单独的服务。这种针对不同特性的功能进行解耦也为之后针对性的优化做了铺垫。

解耦之后的系统结构如下所示:

socket.io-push-server

当站点服务器(A)监测到资源数据更新事件发生时,先将数据推送到消息推送服务器(B),B根据消息的类型以及消息的目标接收人来决定是否推送,如何推送。

由于我们的Web后端是基于Yii框架实现,那么该如何实现A与B的socket.io服务通信呢?socket.io有自己的一套协议,如果自己实现PHP库来与socket.io服务交互,还有一些工作量。最终我们选择elephant.io这个PHP库,并将elephant.io封装为Yii框架的一个组件,实现如下:

<?php

$basePath = Yii::getPathOfAlias('application.vendor.elephantio.lib.ElephantIO');

require_once($basePath . DIRECTORY_SEPARATOR . 'Client.php');
require_once($basePath . DIRECTORY_SEPARATOR . 'Payload.php');

use ElephantIO\Client as Elephant;

class extElephantIO extends CApplicationComponent
{
    public $host = null;
    public $port = null;
    public $namespace = null;

    private $elephant = null;
    private $ioNameSpace = null;

    public function init()
    {
        if ($this->host === null || $this->port === null) {
            throw new Exception('%s: %s: %s, Please give me parameters host and port', basename(
                __FILE__
            ), __FUNCTION__, __LINE__);
        }

    }

    public function setNameSpace($nameSpace)
    {
        if ($this->elephant === null) {
            $this->elephant = new Elephant('http://' . $this->host . ':'
                . $this->port, 'socket.io', 1, false, true, true);
            $this->elephant->init();
        }
        $this->ioNameSpace = $this->elephant->createFrame(null, $nameSpace);
    }

    public function sendMsg($event, $msg)
    {
        if ($this->ioNameSpace === null) {
            if ($this->namespace !== null) {
                $this->ioNameSpace = $this->elephant->createFrame(null, $this->namespace);
            } else {
                throw new Exception('%s: %s: %s, Please setNameSpace before sendMsg', basename(
                    __FILE__
                ), __FUNCTION__, __LINE__);
            }
        }
        $this->ioNameSpace->emit($event, $msg);
    }

    public function close()
    {
        $this->elephant->close();
        $this->elephant = null;
    }
}

将该代码文件放在应用目录extensions下,然后为Yii添加如下配置项:

'components' => array(
    'ElephantIO' => array(
        'class' => 'application.extensions.extElephantIO',
        'host' => 'xxx',
        'port' => xxx,
    ),
    ...
),

当有资源数据变更事件产生时,如下调用向消息推送服务器发送消息:

$elephant = Yii::app()->ElephantIO;
$elephant->setNameSpace('/message_namespace');
$elephant->sendMsg(
    'message_event_type',
    $messageContent
);       
$elephant->close();

对于私有消息推送,如何判断用户当前是否在线?如何验证用户的身份?

可以基于cookie来实现,socket.io提供的浏览器端JS库,在每次连接时,和普通HTTP请求一样,会携带站点域名下的cookie(我们的消息推送服务的域名为站点服务器域名的子域,所以能拿到站点域名下的所有cookie),消息推送服务器在接收到连接(connection事件)请求时,从连接中取出所有cookie,然后向站点Web后端的某个API转发这些cookie,这个API根据cookie验证用户身份,并将用户信息返回给消息推送服务器,消息推送服务器根据用户信息存储当前连接对象,之后当站点服务器向消息推送服务器发送该用户的消息时,就通过该连接对象给用户推送消息。

对于公有消息,即广播消息,实现则比较简单,直接向当前所有连接推送消息即可。


也许有人会问,既然在某些浏览器中socket.io会退化为使用polling/long-polling来传输消息,那么相比直接向站点服务器进行polling/long-polling,有什么优势吗?

我认为优势有两点:

  1. NodeJS的异步事件回调的方式,适合大并发长连接的应用场景。如果Web后端是使用PHP等实现,则更适合短连接的服务。
  2. 将站点的业务逻辑与消息推送逻辑解耦,那么浏览器通过polling/long-polling来获取消息时,只是涉及消息推送逻辑,不需要执行业务逻辑的代码,而业务逻辑的代码可能很复杂,每次polling,都需要执行一遍的话,会浪费服务器很多资源。