中心服务器是连接其他服务器并控制其他服务器的中心枢纽。
设计上:
(1)控制消息转发流程
(1-1)中心服务器的全局连接管理器管理其他服务器连接到中心服务器的连接 (g_center_session_manager)。
(1-2)登录连接管理器管理连接到登录服务器的会话。
(1-3)gm连接管理器管理连接到gm服务器的会话。
(2)其他服务器启动控制
根据服务器表来控制其他服务器启动依赖关系。动态服务器需要等待静态服务器启动了才能启动。
(3)登录流程控制
(3-1)登录临时会话管理器管理临时登陆会话。
(3-2)加入登录临时会话到登录管理器。如果已有该登录会话(表示同一连接正在登录中),并且中心角色在线,那么发送强行存档到网关。
(3-3)登录成功就添加账号到中心服务器(适用于账号与角色一一对应的情况)。如果该玩家已登录就注销该玩家。已经登陆但找不到用户就移除该账号。成功登录,就发送消息到网关。
1、中心服务器初始化
初始化mysql句柄池,从数据库获取中心服务器配置,初始化网络被动连接线程池,初始化网络主动连接线程池,主动连接到登录服务器(和gm服务器),
绑定服务器端口(初始化监听socket,设置收发缓冲区大小(窗口大小128k),并把监听socket注册到epoll描述符里)。
bool center_server::init()
{
mysqlPool = new mysql_handle_pool();
if (NULL == mysqlPool
|| !mysqlPool->putUrl(0, g_xml_config.get_string("Global.MYSQL.Config")))
{
g_log->error("连接数据库失败");
return false;
}
strncpy(pstrIP, tcp_socket::getIPByIfName(g_xml_config.get_string("Global.NetCard.Config")), MAX_IP_LENGTH - 1);
if (!getServerInfo())//从数据库或配置文件获取中心服务器的配置
return false;
//初始化连接线程池(网络被动连接线程池)
taskPool = new tcp_session_pool(g_xml_config.get_integer("Global.ThreadCarryLink.Config"));
if (NULL == taskPool || !taskPool->init())
return false;
//初始化客户端连接池(网络主动连接线程池)
clientPool = new tcp_client_pool(8,50000);
if (NULL == clientPool || !clientPool->init())
return false;
if(!login_client::init(clientPool))//主动连接到登录服务器
{
g_log->error("初始化登陆服务器模块...失败");
return false;
}
#ifdef GMSERVER
if(!gm_client::init(clientPool))//主动连接到gm服务器
{
g_log->error("初始化登陆服务器模块...失败");
return false;
}
#endif
g_log->info("初始化登陆服务器模块...成功");
if( !g_main_logic_thread.start())
{
g_log->error("初始化main_logic_thread模块...失败");
return false;
}
g_log->info("初始化main_logic_thread模块...成功");
if (!server_base::bind_socket(wdPort))//绑定服务器端口(初始化监听socket,设置收发缓冲区大小(窗口大小128k),并把监听socket注册到epoll描述符里)
{
return false;
}
return true;
}
2、其他类型服务器的启动验证
(1)检查服务器依赖
检查服务器依赖,如果其他被依赖的服务器都启动了就可以通知该服务器来启动
除了中心服务器、登录服务器、运营服务器,其他服务器需要等待静态服务器的启动之后才能启动。
服务器之间依赖的标准来自于数据库的静态表。
静态服务器:全局唯一并在数据库服务器表中配了的都是,如db服务器、中心服务器、登录服务器、运营服务器、社会关系服务器。
动态服务器:可启动多个的,如场景服务器和网关服务器。
int center_session::waitSync()
{
...
unsigned char ptrmsg[tcp_socket::MAX_DATASIZE];
int msglen = mSocket.recvToCmd_NoPoll(ptrmsg, sizeof(ptrmsg));
if (msglen > 0)
{
using namespace MSG::Server;
stOKStartServerCmd *ptCmd = (stOKStartServerCmd *)ptrmsg;//服务器启动了会发送这个消息来表明其已经启动
if (START_SERVERCMD == ptCmd->first
&& OK_START_SERVERCMD_PARA == ptCmd->second
&& id == ptCmd->ServerID)
{
sendmsg(ptCmd,sizeof(stOKStartServerCmd));
//debug("客户端连接同步验证成功");
g_log->error("服务器连接同步验证成功,收到消息(%u,%u)",ptCmd->first,ptCmd->second);
return 1;
}
else
{
return 0;
}
}
if(!hasNotifyOther && _sequence_timer(main_logic_thread::currentTime)
&& g_center_session_manager.checkDependence(getType()))//检查服务器依赖,如果其他被依赖的服务器都启动了就可以通知该服务器来启动
{
if(notifyOther())
{
hasNotifyOther=true;
}
}
if(!hasNotifyMe && hasNotifyOther && notifyReturnOk())
{
hasNotifyMe=true;
notifyMe();
}
return 0;
}
(2)获取数据库中的服务器依赖
检查服务器的服务器之间依赖,检查其他服务器依赖的静态服务器是否都已经启动。
中心服务器的连接管理器中可检查获取其他服务器对中心服务器的连接,以此来判断其他服务器是否启动了并连接上中心服务器。
const bool center_session_manager::checkDependence(uint16 type)
{
mysql_record_set *recordset=NULL;
std::ostringstream oss;
mysql_record column,where;
mysql_handle *handle = center_server::mysqlPool->getHandle();
if (!handle)
{
g_log->error("不能从数据库连接池获取连接句柄");
return false;
}
//除了中心服务器、登录服务器、运营服务器,其他服务器需要等待静态服务器的启动之后才能启动
oss << "type<" << type << " and type!=" << CENTERSERVER << " and type!= " \
<< LOGINSERVER << " and type!= " << OPERATIONLOGSERVER;
where.put("type",oss.str());
column.put("id");
column.put("dynamic");
column.put("name");
recordset = handle->exeSelect("SERVERLIST",&column,&where);
unsigned int i;
bool bret=true;
if(recordset)
{
g_log->debug("服务器类型:%d找到依赖服务器个数:%d",type,recordset->size());
for(i = 0; i < recordset->size(); i++)
{
center_session *task = g_center_session_manager.getTaskByID((uint16)recordset->get(i)->getvalue("id"));
uint32 dynamic = recordset->get(i)->getvalue("dynamic");
if(!dynamic && (!task || task->getState() != tcp_session::okay || !task->notifyMeRetornOk()))
{
g_log->debug("等待服务器:%s的启动",(const char *)recordset->get(i)->getvalue("name"));
bret=false;
break;
}
}
}
SAFE_DELETE(recordset);
center_server::mysqlPool->putHandle(handle);
return bret;
}
3、转发服务器消息
(1)转发系统消息
转发服务器消息和广播服务器之间消息(这里接口是处理服务器启动、关闭和系统时间等系统消息)。
其中:中心连接管理器包含所有被动连接。登录连接管理器包含到登录服务器的连接。
bool center_session::msgParseStart(const MSG::base_msg *ptrMsg, const unsigned int msglen)
{
...
case TIMESYNC_START_SERVERCMD_PARA://调整服务器的时间
{
stTimeSyncStartServerCmd *rev = (stTimeSyncStartServerCmd *)ptrMsg;
realtime::set_timesync(rev->sync_sec);
g_log->debug("调整系统时间相对值:%d",rev->sync_sec);
g_center_session_manager.broadcast(rev,sizeof(stTimeSyncStartServerCmd));//转发到连接中心会话的其他服务器
g_login_client_manager.broadcast(rev, sizeof(stTimeSyncStartServerCmd));//转发到登录服务器
return true;
}
break;
case SHUTDOWN_START_SERVERCMD_PARA://广播服务器关闭消息(服务器关闭需要发送消息到中心服务器)
{
stShutdownStartServerCmd *rev = (stShutdownStartServerCmd *)ptrMsg;
//其他服务器会连接中心服务器,关闭时发送这个消息过来,中心服务器再广播到所有连接它的服务器
g_center_session_manager.broadcast(rev,sizeof(stShutdownStartServerCmd));
return true;
}
break;
...
}
(2)转发和处理逻辑消息
转发服务器消息和广播服务器之间消息。
使用直发包头消息:其他服务器发来的直发消息,不再转发到其他服务器了。
使用转发包头消息:服务器之间的消息通过中心服务器转发的消息,和中心服务器主动发送的消息
(这里的是处理角色登录消息(处理登录、登出、换场景消息,往中心服务器增减中心服务器玩家))。
(stFLServerForwardServerCmd 是服务器之间转发消息在普通消息上加的包头,含服务器id字段和数据长度字段(普通消息长度))
bool center_session::msgParseForward(const MSG::base_msg *ptrMsg, const unsigned int msglen)
{
case SEND_USERCMD_TO_SERVERUSER_FORWARD_SERVERCMD://其他服务器发来的直发消息(不再转发到其他服务器了),会使用这个直发包头
{
stSendplayerCmdToServerplayerForwardServerCmd *rev = (stSendplayerCmdToServerplayerForwardServerCmd *)ptrMsg;
center_player* pplayer = g_center_player_manager.get_player_by_id(rev->charid);
if(pplayer)
{
pplayer->usermsgParse((MSG::base_msg *)rev->data, rev->datasize());//处理中心服务器角色终结、注销、挂起、锁定
}
return true;
}
break;
case FL_SERVER_FORWARD_SERVERCMD:
{
//服务器之间的消息通过中心服务器转发和中心服务器主动发送的转发消息会使用一个转发包头
stFLServerForwardServerCmd *rev = (stFLServerForwardServerCmd *)ptrMsg;
if(!rev->id)//转发消息不解析消息体,根据消息头的服务器id转发到对应的服务器上
{
g_login_client_manager.broadcast(rev->data, rev->datasize());//广播登录消息到登录服务器(连接到登录服务器的连接管理器)
}
else
{
g_login_client_manager.broadcastToID(rev->id,rev->data, rev->datasize());//广播登录消息到登录服务器(按服务器id)
}
return true;
}
break;
case SHARE_FORWARD_SERVERCMD:////服务器之间的消息通过中心服务器转发和中心服务器主动发送的登录消息会使用一个转发包头 (处理登录相关消息)
{
stShareForwardServerCmd *rev = (stShareForwardServerCmd *)ptrMsg;
loginmsgParse((MSG::base_msg*)rev->data,rev->datasize());//处理角色登录消息(处理登录、登出、换场景消息,往中心服务器增减中心服务器玩家)
return true;
}
break;
...
}
其中:转发消息构造方式
uint8 buf[tcp_socket::MAX_DATASIZE];
MSG::Server::stFLServerForwardServerCmd *sendmsg=(MSG::Server::stFLServerForwardServerCmd *)buf;
constructInPlace(sendmsg);//构造对象, 模板函数placement new : template<class T> inline void constructInPlace(T *_Ptr){ new ((void *)_Ptr) T();}
sendmsg->id = serverid;
sendmsg->num=msglen;
bcopy(ptrmsg,sendmsg->data,msglen);//拷贝消息数据到转发消息体
4、登陆控制
(1)中心服务器处理登录服务器连接发来的登录消息
处理登录中心角色的流程:
1)加入登录临时会话到登录管理器
2)下线存档正在在线的玩家
3)如果角色在线,那么强行存档
4)添加账号到中心服务器
5)如果该玩家已登录就注销该玩家
6)已经登陆但找不到用户就移除该账号
7)开始登录,发送消息到网关
8)成功登录
bool login_client::msgparser_login(const MSG::base_msg *ptrMsg,const unsigned int msglen)
{
switch(ptrMsg->second)
{
using namespace MSG::FL;
case MSG::FL::PARA_SESSION_NEWSESSION://登录服务器发来的登录消息
{
t_NewLoginMsg *rev = (t_NewLoginMsg *)ptrMsg;
t_NewLoginMsg newsession;
newsession.session = rev->session;
LoginCommand *loginSession = new LoginCommand(rev->session);
if(loginSession)
{
bool ok = true;
(1-1)加入登录临时会话到登录管理器
if(!g_loginsession_mgr.addSession(loginSession))//g_loginsession_mgr.addSession 可以查看该玩家是否还在登录流程
{
//账号正在网关登陆,有玩家的话就需要进行下线处理(存档和提示前端)
error_log("账号正在网关登陆%s(%u,%u)",rev->session.account,rev->session.accid,rev->session.loginTempID);
ok = false;//登录失败,有该玩家正在登录
}
(1-2)下线存档正在在线的玩家
center_player *c_player = g_center_player_manager.get_player_by_accid(rev->session.accid);
if(c_player) ////如果角色在线。那么强行存档
{
MSG::Share::t_Force_Save_Player_Data send;
send.charid = c_player->id;
strncpy(send.name, c_player->name.c_str(), MAX_NAME_LEN);
debug_log("角色下线:%u,%s", c_player->id, c_player->name.c_str());
MSG::center::stTerminateErrorcenterCmd logoutsend;
logoutsend.accid = rev->session.accid;
c_player->terminate_error = false;
if(c_player->gateway)
{
debug_log("通知网关:%u 角色下线:%u,%s", c_player->gateway->id, c_player->id, c_player->name.c_str());
c_player->gateway->sendmsg(&logoutsend,sizeof(logoutsend));
}
......
}
(1-3)添加账号到中心服务器
if(ok && !g_center_player_manager.addAccid(rev->session.accid, rev->session.wdGatewayID))
{
(1-4)如果该玩家已登录就注销该玩家
center_player *user = g_center_player_manager.get_player_by_accid(rev->session.accid);
if(user)//注销该玩家
{
if( user->terminate_error )//(1-5)该玩家终结有误的发送终结消息
{
info_log("错误终止消息");
MSG::center::stTerminateErrorcenterCmd send;
send.accid = rev->session.accid;
user->terminate_error = false;
user->gateway->sendmsg(&send,sizeof(send));
}
if(!user->terminate_unreg)//(1-6)还没注销的玩家就是重登录,发送终结消息给原来的客户端
{
MSG::center::stTerminateErrorcenterCmd send;
send.accid = rev->session.accid;
user->terminate_error = false;
user->gateway->sendmsg(&send,sizeof(send));
warn_log("警告,【%s,%u】试图从别处登陆该游戏帐号",user->name.c_str(), user->id);
ok = true;
}
}
else
{
//(1-7)已经登陆但找不到用户就移除该账号
error_log("已经登陆但找不到用户,%s(%u,%u)",rev->session.account, rev->session.accid,rev->session.loginTempID);
g_loginsession_mgr.removeSession(loginSession);
SAFE_DELETE(loginSession);
g_center_player_manager.removeAccid(rev->session.accid);
ok = false;
return false;
}
}
(1-8)开始登录,发送消息到网关
if(ok && !g_login_client_manager.sendmsgToGateway(newsession.session.wdGatewayID, &newsession,sizeof(newsession)))
{
error_log("帐号%s(%u,%u)登陆时网关已经关闭",rev->session.account, rev->session.accid,rev->session.loginTempID);
g_loginsession_mgr.removeSession(loginSession);
SAFE_DELETE(loginSession);
g_center_player_manager.removeAccid(rev->session.accid);
ok = false;
return true;
}
//成功登录(或重登录)
debug_log("帐号登陆%s(%u,%u,%u)",rev->session.account, rev->session.accid,rev->session.loginTempID,rev->session.wdGatewayID);
}
return true;
}
break;
default:
break;
}
g_log->error("%s(%u, %u, %u)", __PRETTY_FUNCTION__, ptrMsg->first, ptrMsg->second, msglen);
return false;
}
(2)登录临时会话管理
删除过期的会话,删除中心服务器账号。
在中心服务器的逻辑线程循环里遍历删除需要删除的会话列表:
LoginVerifyTimeOut exec(main_logic_thread::currentTime);
g_loginsession_mgr.traverse(exec);
for(std::list<LoginCommand *>::iterator iter = exec._del.begin() ; iter != exec._del.end() ; ++ iter)
{
g_log->debug("通知网关删除Social:(%d,%d)", (*iter)->accid, (*iter)->loginTempID);
g_loginsession_mgr.removeSession(*iter);
SAFE_DELETE(*iter);
}