一种基于Qt的可伸缩的全异步C/S架构服务器实现(五) 单层无中心集群
C++,Qt,多线程,服务器,集群2016-06-28
对40万用户规模以内的服务器,使用星形的无中心连接是较为简便的实现方式。分布在各个物理服务器上的服务进程共同工作,每个进程承担若干连接。为了实现这个功能,需要解决几个关键问题。
设计在高速局域网中的连接可直接采用TCP,并用第二章介绍的网络传输工具、第三章介绍的流水线线程池共同搭建。引用上述两个工具的代码在cluster子文件夹的 zp_clusterterm.h中定义:
ZPNetwork::zp_net_Engine * m_pClusterNet; ZPTaskEngine::zp_pipeline * m_pClusterEng;
1、监听的地址、端口
2、本节点唯一名称
3、对服务器集群内其他节点发布的连接端口、地址
4、对公网客户端发布的连接端口、地址。
比如,服务器高速局域网网段可能是 10.129.XX.XX,而有些服务器可能以虚拟机(192.168.11.XX)+NAT(10.129.XX.XX)的方式在内网的子网中映射,因此,需要告诉别的服务器节点,如何连接到自己。同时,对公网客户端来说,每个服务器的连接地址又不同了。很有可能也是通过NAT的方式,把数十个内网IP映射到一个外网出口的连续端口上去。这个策略的配置页面如下:
集群的连接策略是,新的服务器进程选取任意一个现有节点,连接后,通过集群内广播系统自动接收其它各个节点的地址,并继续发起连接,直到与现有节点两两相通为止。
为了支持这个策略,集群传输需要定义一些指令。
集群指令在 cluster 文件夹的cross_svr_message.h 定义:
#ifndef CROSS_SVR_MESSAGES_H #define CROSS_SVR_MESSAGES_H #include <qglobal.h> namespace ZP_Cluster{ #pragma pack (push,1) typedef struct tag_cross_svr_message{ struct tag_header{ quint16 Mark; //Always be 0x1234 quint8 messagetype; quint32 data_length; } hearder; union uni_payload{ quint8 data[1]; struct tag_CSM_heartBeating{ quint32 nClients; } heartBeating; struct tag_CSM_BasicInfo{ quint8 name [64]; quint8 Address_LAN[64]; quint16 port_LAN; quint8 Address_Pub[64]; quint16 port_Pub; } basicInfo; struct tag_CSM_Broadcast{ quint8 name [64]; quint8 Address_LAN[64]; quint16 port_LAN; quint8 Address_Pub[64]; quint16 port_Pub; } broadcastMsg[1]; } payload; } CROSS_SVR_MSG; #pragma pack(pop) } #endif // CROSS_SVR_MESSAGES_H指令由头部、载荷两部分组成。
头部header说明:
Mark是一个固定的起始,用于验证流解译的正确性。如果流解译不正确,第二块指令的起始将不是这个值。
messagetype 是一个用来标定指令类型的字节,决定了载荷联合体该采用哪个策略解译
data_length是长度,这里代表载荷的长度
载荷 payload说明:
有三种指令结构体, 心跳结构体用来维持各个服务器之间的心跳,基本信息(basicInfo)用于在连接建立后,向对方告知本节点的信息。广播结构体是用于在本机的服务器列表发生变更时,向所有现有节点广播新的列表。
对传输的用户数据,直接存储在data中。
第一步,准备加入集群的服务器选取集群中任一个节点作为对象,发起P2P连接。
第二步,双方互换信息(basicInfo)
第三步,双方将对方的信息添加到本地的服务器节点表中。服务器节点表是一群zp_ClusterNode类的实例,该类由ZPTaskEngine::zp_plTaskBase派生。这个基类在第三章有介绍。服务器节点对象的实例负责具体的指令解译。该列表如下(在cluster子文件夹的 zp_clusterterm.h中定义):
//important hashes. server name to socket, socket to server name QMutex m_hash_mutex; QMap<QString , zp_ClusterNode *> m_hash_Name2node; QMap<QObject *,zp_ClusterNode *> m_hash_sock2node;
第四步,由于节点表发生变化,因此,会触发对现有节点的广播(broadCasting)
第五步,各个节点收到广播后,会比较广播中的节点信息和自己目前的节点信息,并发起向新增节点的连接。
最终,当一对一连接完成,系统重新处于稳定状态。解译这段信息的代码片段在中cluster文件夹zp_clusternode.cpp的deal_current_message_block方法中实现:
switch(m_currentHeader.messagetype) { \\... case 0x01://basicInfo, when connection established, this message should be used if (bytesLeft==0) { QString strName ((const char *)pMsg->payload.basicInfo.name); if (strName != m_pTerm->name()) { this->m_strTermName = strName; m_nPortLAN = pMsg->payload.basicInfo.port_LAN; m_addrLAN = QHostAddress((const char *)pMsg->payload.basicInfo.Address_LAN); m_nPortPub = pMsg->payload.basicInfo.port_Pub; m_addrPub = QHostAddress((const char *)pMsg->payload.basicInfo.Address_Pub); if (false==m_pTerm->regisitNewServer(this)) { this->m_strTermName.clear(); emit evt_Message(this,tr("Info: New Svr already regisited. Ignored.")+strName); emit evt_close_client(this->sock()); } else { emit evt_NewSvrConnected(this->termName()); m_pTerm->BroadcastServers(); } } else { emit evt_Message(this,tr("Can not connect to it-self, Loopback connections is forbidden.")); emit evt_close_client(this->sock()); } } break; case 0x02: //Server - broadcast messages if (bytesLeft==0) { int nSvrs = pMsg->hearder.data_length / sizeof(CROSS_SVR_MSG::uni_payload::tag_CSM_Broadcast); for (int i=0;i<nSvrs;i++) { QString strName ((const char *)pMsg->payload.broadcastMsg[i].name); if (strName != m_pTerm->name() && m_pTerm->SvrNodeFromName(strName)==NULL) { QHostAddress addrToConnectTo((const char *)pMsg->payload.broadcastMsg[i].Address_LAN); quint16 PortToConnectTo = pMsg->payload.broadcastMsg[i].port_LAN; if (strName > m_pTerm->name()) emit evt_connect_to(addrToConnectTo,PortToConnectTo,false); else emit evt_Message(this,tr("Name %1 <= %2, omitted.").arg(strName).arg(m_pTerm->name())); } } } break; ...
TCP 是面向连接的流式传输。对用户发送的一个大数据包,虽然保证收发的完整性,旦接收方每次接收的数据片段长度是有限的,也是不定的。一种简单的思路是按照指令结构体的长度,直接缓存完整的数据包,而后集中处理。这样有一个问题,在数据包很大时,内存开销过高。因此,本应用设计的思路是边接收、边处理。具体步骤:
1、检查收到的头部是否合法
2、存储当前指令的头部
3、一旦得到一段载荷数据,就回调一次处理过程,处理过程根据需求等待更多数据,或者处理完后清空缓存。这对一次传输100MB数据的应用是很关键的。流式处理需要完成的步骤关键代码如下:
在 zp_ClusterTerm的接收槽里,直接把数据片段压入zp_ClusterNode对象的队列中,并压入流水线。
//some data arrival void zp_ClusterTerm::on_evt_Data_recieved(QObject * clientHandle,QByteArray datablock ) { //Push Clients to nodes if it is not exist zp_ClusterNode * pClientNode = ...; int nblocks = pClientNode->push_new_data(datablock); if (nblocks<=1) m_pClusterEng->pushTask(pClientNode); //... }
class zp_ClusterNode : public ZPTaskEngine::zp_plTaskBase { //..... //Data Process //The raw data queue and its mutex QList<QByteArray> m_list_RawData; QMutex m_mutex_rawData; //The current Read Offset, from m_list_RawData's beginning int m_currentReadOffset; //Current Message Offset, according to m_currentHeader int m_currentMessageSize; //Current un-procssed message block.for large blocks, //this array will be re-setted as soon as some part of data has been //dealed, eg, send a 200MB block, the 200MB data will be splitted into pieces QByteArray m_currentBlock; CROSS_SVR_MSG::tag_header m_currentHeader; //... };
变量 m_currentMessageSize 指的是当前接收的信息的大小。比如100MB 的信息,接受了23MB,这个值就是23MB
变量 m_currentBlock 是当前的缓存。这个缓存会不断的递交处理,负责处理的程序可以根据情况适时清空它。对短指令,不清也是可以的。
变量 m_currentHeader 是当前的信息头部,这个值记录了当前结构体的首部信息。
在线程池中,会调用 zp_ClusterNode::run 虚拟方法。这个方法的关键代码如下(实际代码因为有线程同步,要复杂一些):
int zp_ClusterNode::run() { //nMessageBlockSize 是静态变量,表示最多处理几个块就释放CPU给其他节点 int nMessage = m_nMessageBlockSize; int nCurrSz = -1; while (--nMessage>=0 && nCurrSz!=0 ) { QByteArray block; block = *m_list_RawData.begin(); m_currentReadOffset = filter_message(block,m_currentReadOffset); if (m_currentReadOffset >= block.size()) { m_list_RawData.pop_front(); m_currentReadOffset = 0; } nCurrSz = m_list_RawData.size(); } if (nCurrSz==0) return 0; return -1; }
这个方法的关键代码如下:
//!deal one message, affect m_currentRedOffset,m_currentMessageSize,m_currentHeader //!return bytes Used. int zp_ClusterNode::filter_message(QByteArray block, int offset) { const int blocklen = block.length(); while (blocklen>offset) { const char * dataptr = block.constData(); //先确保信息的头标志被接收 while (m_currentMessageSize<2 && blocklen>offset ) { m_currentBlock.push_back(dataptr[offset++]); m_currentMessageSize++; } if (m_currentMessageSize < 2) //First 2 byte not complete continue; if (m_currentMessageSize==2) { const char * headerptr = m_currentBlock.constData(); memcpy((void *)&m_currentHeader,headerptr,2); } const char * ptrCurrData = m_currentBlock.constData(); //判断头2个字节是不是1234 if (m_currentHeader.Mark == 0x1234) //Valid Message { //试图接收完整的头部信息 if (m_currentMessageSize< sizeof(CROSS_SVR_MSG::tag_header) && blocklen>offset) { int nCpy = sizeof(CROSS_SVR_MSG::tag_header) - m_currentMessageSize; if (nCpy > blocklen - offset) nCpy = blocklen - offset; QByteArray arrCpy(dataptr+offset,nCpy); m_currentBlock.push_back(arrCpy); offset += nCpy; m_currentMessageSize += nCpy; } //如果头部还没收完则返回 if (m_currentMessageSize < sizeof(CROSS_SVR_MSG::tag_header)) //Header not completed. continue; //除了头部以外,还有数据可用,并且头部刚刚接收完 else if (m_currentMessageSize == sizeof(CROSS_SVR_MSG::tag_header))//Header just completed. { //保存头部 const char * headerptr = m_currentBlock.constData(); memcpy((void *)&m_currentHeader,headerptr,sizeof(CROSS_SVR_MSG::tag_header)); //继续处理后续的载荷 if (block.length()>offset) { //确定还有多少字节没有接收 qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header) -m_currentMessageSize ; //继续接收载荷 if (bitLeft>0 && blocklen>offset) { int nCpy = bitLeft; if (nCpy > blocklen - offset) nCpy = blocklen - offset; QByteArray arrCpy(dataptr+offset,nCpy); m_currentBlock.push_back(arrCpy); offset += nCpy; m_currentMessageSize += nCpy; bitLeft -= nCpy; } //处理一次数据 deal_current_message_block(); if (bitLeft>0) continue; //This Message is Over. Start a new one. m_currentMessageSize = 0; m_currentBlock = QByteArray(); continue; } } //除了头部以外,还有数据可用 else { if (block.length()>offset) { //确定还有多少字节没有接收 qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header) -m_currentMessageSize ; //继续接收载荷 if (bitLeft>0 && blocklen>offset) { int nCpy = bitLeft; if (nCpy > blocklen - offset) nCpy = blocklen - offset; QByteArray arrCpy(dataptr+offset,nCpy); m_currentBlock.push_back(arrCpy); offset += nCpy; m_currentMessageSize += nCpy; bitLeft -= nCpy; } //deal block, may be processed as soon as possible; deal_current_message_block(); if (bitLeft>0) continue; //This Message is Over. Start a new one. m_currentMessageSize = 0; m_currentBlock = QByteArray(); continue; } } // end if there is more bytes to append } //end deal trans message else //... } // end while block len > offset return offset; }
在处理当前块数据的方法 deal_current_message_block里,即可逐一判断消息类型,加以处理了。
集群模块只负责在服务器之间建立连接,并提供一套传输用户数据的通路。在集群建立连接后,用户直接通过
void zp_ClusterTerm::SendDataToRemoteServer(QString svrName,QByteArray SourceArray) { int nMsgLen = sizeof(CROSS_SVR_MSG::tag_header) + SourceArray.size(); QByteArray array(nMsgLen,0); CROSS_SVR_MSG * pMsg =(CROSS_SVR_MSG *) array.data(); pMsg->hearder.Mark = 0x1234; pMsg->hearder.data_length = SourceArray.size(); pMsg->hearder.messagetype = 0x03; memcpy (pMsg->payload.data,SourceArray.constData(),SourceArray.size()); m_hash_mutex.lock(); if (m_hash_Name2node.contains(svrName)) netEng()->SendDataToClient(m_hash_Name2node[svrName]->sock(),array); m_hash_mutex.unlock(); }
void evt_RemoteData_recieved(QString /*svrHandle*/,QByteArray /*svrHandle*/ );
但是,以下问题是不涉及的。
1、传输的数据的具体意义解释
2、全局客户端的UUID哈希和同步
3、客户端数据是否被真正接收。
这些部分留给应用相关部分来具体实现。