要永远相信你的灵魂。 --卡尔玛
在《游戏架构方案》一文中提到了游戏架构无非就是看网络框架,业务线程池,数据存取线程池如何实现的。在《Java游戏服业务线程模型一》一文中,我们讲述了一种业务线程模型的实现,本文将讲述另一种实现,即每个玩家维护一个消息队列,看下又如何实现。
首先,凡是需要处理客户端消息的对象,都需要维护一个消息队列,定义如下:
public class BaseMgr{
private AllMgr allMgr = null; //管理所有消息对象的总管理器
private LinkedList<Packet> packets = new LinkedList<Packet>(); //Packet为接收到的客户端消息(数据包)
}
那有哪些需要处理客户端消息呢?首先每个玩家是需要的,因此每个玩家对象都要有这个队列;假如有公会(帮派),它们需要处理所有公会的消息,那么公会也需要有这个队列;假如有世界boss,世界boss要管理所有攻打它的玩家,那么世界boss也要有这个队列……
公会和世界boss的消息量不是很多,那么公会和世界boss可以做成单例;单个玩家的消息量在某个时刻也不是很多,但是所有玩家总得消息量在某一时刻可能就很多了,而且不能让一个线程处理所有玩家的消息,因此最好每个玩家都维护一个消息队列,以便后续分给线程池中不同线程处理。
那么,这个队列就可以抽象出来,让所有要处理消息的对象都继承它,即继承BaseMgr。
那消息来的时候,比如,玩家请求加入公会的协议id为1001,请求攻打世界boss的消息id为2001,请求加好友的消息id为3001,如何找到各自的消息队列呢?
这就需要把所有继承消息队列的对象都管理起来了,通过某种对应能快速找到自己的消息队列。通常每个玩家对象都对应一个各自的玩家rid,通过这个rid可以找到该玩家的消息队列,所以管理这些消息队列的可以用个map来管理。世界boss单例,公会单例通常没有唯一id标识,那我们就可以为这些单例手动定义一个永远不会和玩家rid冲突的唯一id。
因此,管理所有消息队列的对象,我们可定义一个总管理类AllMgr
public class AllMgr {
private Map<Long, BaseMgr> mgrMap = null;//管理所有消息队列对象
private ExecutorService pool = null; //业务线程池
private volatile boolean isRunning;//是否停止线程池
private static corePoolSize = GameConfig.getInstance().getProperty("corePoolSize", 8); //开启业务线程数量
private LinkedBlockingQueue<BaseMgr> msgMgrQueue = null;//只存放有消息的管理器,如某玩家对象,公会单例对象,世界boss单例对象,世界管理器对象等。
public AllMgr(){
mgrMap = new ConcurrentHashMap<Long, BaseMgr>();
pool = Executors.newFixedThreadPool(corePoolSize);
msgMgrQueue = new LinkedBlockingQueue<BaseMgr>();
}
public void runningPool(){
isRunning = true;
for(int i = 0; i < corePoolSize; i++){
pool.excute(this);
}
}
public void putMsgMgrQueue(BaseMgr baseMgr){
msgMgrQueue.put(baseMgr);//若队列已满,发生阻塞,一直等待空间
}
}
单例的唯一id定义就简单了,注意不要和玩家rid,其他单例id冲突即可。
public class MgrDefine{
public static final long MGR_OF_WORLD = 1L;//世界管理器
public static final long MGR_OF_GUILDE = 2L;//帮派
public static final long MGR_OF_WORLDBOSS = 3L;//世界boss
}
收到客户端消息诸如1001,2001,3001是如何找到各自的消息队列对象,然后把消息加入到消息队列中去的呢?
在做业务功能的时候,我们肯定知道哪条协议将由哪个模块处理的,比如请求加入公会,那么应该交由公会队列那边去处理;请求加好友,应该交由玩家自己队列那边去处理,所以这时应该要有个管理协议路由的管理类,我们姑且把它定为MsgRouteMgr。
public class MsgRouteMgr{
private static Map<Integer, MsgHandler> msgHandlerMap = new HashMap<>();//消息处理路由map
}
这样根据协议id,就能找到对应的消息队列处理器。MsgHandlerMgr应至少包含处理协议的类名(用于反射创建相应对象),处理协议的方法名,可定义为:
public class MsgHandler{
public final int cmd; //协议号
public final Class<?> object; //处理协议类的<父类>对象
public final Method method; //处理协议方法
public MsgHandler(int cmd, Class<?> object, String method){
this.cmd = cmd;
this.object = object;
this.method = clazz.getDeclaredMethod(method, Long.class, Packet.class);
}
}
MsgRouteMgr如何初始化呢?
一、可以利用Spring扫描,结合自定义注解的方式,比如自定义注解@HandlerProtoBuf,凡是方法上添加此注解的,都看成是处理客户端协议的(在这个例子中,这个方法带有两个参数, Long rid, Packet packet)。
二、直接手动添加。比如:public class MsgRouteMgr{ …… static{ addMsgHandlerMgr(MsgDefine.JOIN_GUILD, GuildeMgr.class, "joinGuilde"); …… } }
这样,当从网络层(netty或mina)收到客户端的请求协议后,先在AllMgr中找到相应的消息队列对象baseMgr,用baseMgr.offerPacket()即把消息添加到该对象的消息队列中去了,并唤醒业务线程:
public class BaseMgr{
……
public void offerPacket(Packet packet){
synchronized(packets){
packets.add(packet); //添加消息到消息队列
allMgr.putMsgMgrQueue(this); //将消息对象加入总管理器AllMgr中的msgMgrQueue
}
}
public Packet getPacket(){
synchronized(packets){
return packets.isEmpty() ? null : packets.pop();
}
}
}
不知大家是否还记得基于线程池的生产者和消费者模型,模型如下:
LinkedBlockingQueue<E> blockingQueue = new LinkedBlockingQueue<>(10000);
ExecutorService producerTheadPool = Executors.newFixedThreadPool(4);
ExecutorService consumerTheadPool = Executors.newFixedThreadPool(4);
其实这篇的线程模型和这是很相似的,netty或mina的网络层相当于生产者,AllMgr中的msgMgrQueue相当于这里的产品队列blockingQueue,AllMgr中的pool相当于消费者线程池,从msgMgrQueue中取出“产品”,即消息队列对象进行消费。因此,我们需要再写出消费者线程逻辑即可。
AllMgr中业务线程被唤醒后,循环AllMgr下存有消息的消息对象队列msgMgrQueue,每个线程处理其中一个消息对象中的所有消息:
public class AllMgr{
……
@Override
public void run(){
while(isRunning){
BaseMgr baseMgr = null;
try {
baseMgr = msgMgrQueue.take();//若队列为空,发生阻塞,等待有元素
} catch (InterruptedException e) {
e.printStackTrace();
}
if(baseMgr != null){
handlerPacket(baseMgr);
}
}
}
public void handlerPacket(BaseMgr baseMgr){
Packet packet = baseMgr.getPacket();
while(packet != null){
Long rid = packet.getRid();
MsgHandler handler = MsgRouteMgr.getInstance().getMsgHandler(packet.getCmd());
handler.method.invoke(handler.object, rid, packet);
packet = baseMgr.getPacket();
}
}
}
这样,从网络层收到客户端的协议后,先在AllMgr中找到相应的消息队列对象baseMgr,把消息压入该对象的消息队列,并将此对象添加到阻塞队列msgMgrQueue中,注意对象的消息队列和存放对象的阻塞队列它们的区别,当msgMgrQueue一旦不为空时,就会唤醒业务线程池中线程处理msgMgrQueue中的消息对象,从而取出该对象的消息队列中所有消息并处理。从而实现了在单个线程中按序处理某个对象中所有的消息,而不是把该对象中的所有消息同时分散给其他线程处理。从而也达到了与《Java游戏服业务线程模型一》一样的目的,实现了另一种游戏业务线程模型。这种线程模型有个特点,就是不同的消息处理对象之间交流的话,必须也以消息的形式告诉对方,并加在对方的消息队列后面等待处理。
注:这里还需要一个管理所有在线玩家的管理器WorldMgr,玩家在请求登录时,根据登录协议id,路由到这个管理器,在登录协议中,带上玩家的账号信息然后查数据是否有这个玩家,没有则再通知客户端创建角色,因此创建角色的协议,也需路由到WorldMgr管理器。有了玩家对象后,后续就可以把消息添加到玩家对象消息队列中了;在其他单例管理模块获取玩家信息,也能方便获取了。下述的GamePlayer也是继承BaseMgr的:
public WorldMgr extends BaseMgr{ private Map<Long, GamePlayer> players = new ConcurrentHashMap<Long, GamePlayer>(); //所有在线玩家rid -> player }