在看到本文之前,如果读者没看过笔者的前文Mina网络通信框架 ,请先翻阅。
在本节中,我将用 Mina 框架,同时实现服务器和客户端,实现客户端与客户端之间的 IM 通信,使用 Java JDBC 接口连入 MySQL 数据库,实现一个具有登录功能的简易 IM 聊天工具。
首先,我们来看看效果图:
使用 Navicat 可视化操作工具创建并连接数据库
启动 Mina 服务器
启动登录界面
注册失败演示
注册成功演示
观察数据库,发现添加了新用户
登录后的聊天窗口
再登录一个用户
发送消息演示
下面,笔者贴出几个核心代码部分:
数据库登录处理逻辑
package database;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.mina.core.session.IoSession;
import util.MessageUtil;
import util.SessionUtil;
import common.Common;
/**
* 管理客户端的数据库
*/
public class ClientJDBC {
public ClientJDBC() {
try {
// 加载JDBC Driver
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
/**
* <pre>
* 1、查询是否已存在此用户
* 2、注册,添加新数据
*/
public void addNewData(String name, String password, IoSession loginSession) {
try {
// 建立连接
Connection connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/clientdb", "root", "");
// 创建Statement
Statement statement = connection.createStatement();
// 查询是否已存在此用户
String selectSQL = "select password from client where name=" + "'"
+ name + "'";
ResultSet set = statement.executeQuery(selectSQL);
// 如果不存在此用户
if (!set.next()) {
// 添加数据
PreparedStatement preparedStatement = connection
.prepareStatement("insert into client values(null,?,?)");
preparedStatement.setString(1, name);
preparedStatement.setString(2, password);
preparedStatement.executeUpdate();
// 提示用户注册成功
loginSession.write(Common.CANREG);
} else {
// 提示用户已注册
loginSession.write(Common.CANTRE);
}
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
/**
* <pre>
* 1、查询帐号密码是否匹配
* 2、查询是否用户已登录
* 3、登录
*/
public boolean login(String name, String password, IoSession userSession) {
try {
// 建立连接
Connection connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/clientdb", "root", "");
// 创建Statement
Statement statement = connection.createStatement();
// 查询是否已存在此用户
String selectSQL = "select password from client where name=" + "'"
+ name + "'";
ResultSet set = statement.executeQuery(selectSQL);
// 如果存在此用户
if (set.next()) {
// 密码匹配成功
if (set.getString("password").equals(password)) {
// 如果用户没登录
if (SessionUtil.getSessionFromName(name) == null) {
// 存储用户名
userSession.setAttribute("name", name);
// 登录成功
String s = name;
String message = MessageUtil.createSendMessage(Common.LOGIN_SUCCESS, s);
userSession.write(message);
// 发送在线列表给所有客户端
SessionUtil.sendClientListToAll();
} else {
// 提示用户已登录
userSession.write(Common.IS_LOGIN);
}
} else {
// 提示帐号和密码不匹配
userSession.write(Common.ISNT_MATCH);
}
} else {
// 提示帐号和密码不匹配
userSession.write(Common.ISNT_MATCH);
}
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
return true;
}
}
服务器端Handler处理逻辑
注意,此处继承的是StreamIoHandler,为了后续对传输文件进行操作
package server;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.stream.StreamIoHandler;
import thread.IoStreamThreadWork;
import util.SessionUtil;
import common.Common;
/**
* Mina Handler
*/
public class ServerHandler extends StreamIoHandler {
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
System.out.println(session.getRemoteAddress() + " "
+ "exceptionCaught");
System.out.println(cause);
}
/**
* mina只能接收以换行符结束的信息
*/
@Override
public void messageReceived(IoSession session, Object message) {
System.out.println(session.getRemoteAddress() + " "
+ "messageReceived");
// 解析信息
String result = (String) message;
// 注册信息
if (result.startsWith(Common.REGISTER) && result.length() > 10) {
String suffix = result.substring(10);
String[] array = suffix.split(",");
MinaServer.clientJDBC.addNewData(array[0], array[1], session);
}
// 登录信息
else if (result.startsWith(Common.LOGIN) && result.length() > 10) {
String suffix = result.substring(10);
String[] array = suffix.split(",");
MinaServer.clientJDBC.login(array[0], array[1], session);
}
// 登出信息
else if (result.equals(Common.LOGOUT) && result.length() == 6) {
// 关闭当前session
session.close();
// 发送在线列表给所有客户端
SessionUtil.sendClientListToAll();
}
// 用户之间发送消息
else if (result.startsWith(Common.CLIENT) && result.length() > 10) {
String suffix = result.substring(10);
// 分隔选中用户名和要发送的消息
String[] array = suffix.split(Common.SEPARATOR);
// 向指定用户发送消息
if (session.getAttribute("name") != null)
SessionUtil.sendToSelectedClient(
(String) session.getAttribute("name"), array[0],
array[1]);
}
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
System.out.println(session.getRemoteAddress() + " " + "messageSent");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
System.out
.println(session.getRemoteAddress() + " " + "sessionClosed");
}
/**
* 用户创建的时候保留会话的ip和端口
*/
@Override
public void sessionCreated(IoSession session) throws Exception {
System.out.println(session.getRemoteAddress() + " "
+ "sessionCreated");
// 提取客户端ip和端口
String s = session.getRemoteAddress() + "";
String port = s.substring(s.indexOf(":") + 1);
String ip = s.substring((s.indexOf("/") + 1),
s.length() - 1 - port.length());
// 存储ip和端口
if (session.getAttribute("ip") == null)
session.setAttribute("ip", ip);
if (session.getAttribute("port") == null)
session.setAttribute("port", port);
}
/**
* 会话空闲状态
*/
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
System.out.println(session.getRemoteAddress() + " " + "sessionIdle");
}
@Override
public void sessionOpened(IoSession session) {
System.out
.println(session.getRemoteAddress() + " " + "sessionOpened");
}
@Override
protected void processStreamIo(IoSession session, InputStream input,
OutputStream output) {
System.out.println(session.getRemoteAddress() + " "
+ "processStreamIo");
// 设定一个线程池
// 参数说明:最少数量3,最大数量6 空闲时间 3秒
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 6, 3,
TimeUnit.SECONDS,
// 缓冲队列为3
new ArrayBlockingQueue<Runnable>(3),
// 抛弃旧的任务
new ThreadPoolExecutor.DiscardOldestPolicy());
FileOutputStream fos = null;
// 此处路径如何动态设定。
File receiveFile = new File("F:\\111.jks");
try {
fos = new FileOutputStream(receiveFile);
} catch (FileNotFoundException e1) {
e1.printStackTrace();
} finally {
try {
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 将线程放入线程池 当连接很多时候可以通过线程池处理
threadPool.execute(new IoStreamThreadWork(input, fos));
}
}
客户端Handler处理逻辑
package client;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.OutputStream;
import javax.swing.DefaultListModel;
import javax.swing.JOptionPane;
import javax.swing.JTextArea;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.stream.StreamIoHandler;
import thread.IoStreamThreadWork;
import common.Common;
import frame.ChatFrame;
import frame.LoginFrame;
/**
* Mina Handler
*/
// IoHandlerAdapter
public class ClientHandler extends StreamIoHandler {
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
System.out.println(session.getLocalAddress() + " "
+ "exceptionCaught");
System.out.println(cause);
}
/**
* mina只能接收以换行符结束的信息
*/
@Override
public void messageReceived(IoSession session, Object message) {
System.out.println(session.getLocalAddress() + " "
+ "messageReceived");
// 解析信息
String result = (String) message;
// 注册成功
if (result.equals(Common.CANREG) && result.length() == 6) {
JOptionPane.showMessageDialog(null, "注册成功!");
}
// 用户已存在注册失败
else if (result.equals(Common.CANTRE) && result.length() == 6) {
JOptionPane.showMessageDialog(null, "用户已存在!");
}
// 用户已登录,登录失败
else if (result.equals(Common.IS_LOGIN) && result.length() == 6) {
JOptionPane.showMessageDialog(null, "用户已登录!");
session.close();// 关闭这个Session
}
// 帐号和密码不匹配,登录失败
else if (result.equals(Common.ISNT_MATCH) && result.length() == 6) {
JOptionPane.showMessageDialog(null, "帐号和密码不匹配!");
session.close();// 关闭这个Session
}
// 登录成功
else if (result.startsWith(Common.LOGIN_SUCCESS)
&& result.length() > 10) {
String suffix = result.substring(10);
// 从Attribute中取出登录窗口
LoginFrame frame = (LoginFrame) session.getAttribute("loginFrame");
// 关闭登录窗口
frame.dispose();
// 打开聊天窗口
new ChatFrame(session).setTitle("用户" + suffix + "的聊天窗口");
}
// 服务器发送在线列表消息
else if (result.startsWith(Common.SERVER) && result.length() > 10) {
String suffix = result.substring(10);
String[] array = suffix.split(",");
// 判断空,因为有loginSession
if (session.getAttribute("clientListModel") != null) {
// 取出clientListModel
DefaultListModel<String> clientListModel = (DefaultListModel<String>) (session
.getAttribute("clientListModel"));
// 清空原视图
clientListModel.removeAllElements();
// 重新加入在线用户列表
for (String clientName : array)
clientListModel.addElement(clientName);
}
}
// 客户发送过来的消息
else if (result.startsWith(Common.CLIENT) && result.length() > 10) {
String suffix = result.substring(10);
// if (session.getAttribute("receivedMessage") != null)
JTextArea jta = (JTextArea) session.getAttribute("receivedMessage");
jta.append(suffix + "\n");
// 滚动到底端
jta.setCaretPosition(jta.getText().length());
}
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
System.out.println(session.getLocalAddress() + " " + "messageSent");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
System.out.println(session.getLocalAddress() + " " + "sessionClosed");
}
@Override
public void sessionCreated(IoSession session) throws Exception {
System.out
.println(session.getLocalAddress() + " " + "sessionCreated");
}
/**
* 会话空闲状态
*/
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
System.out.println(session.getLocalAddress() + " " + "sessionIdle");
}
@Override
public void sessionOpened(IoSession session) {
System.out.println(session.getLocalAddress() + " " + "sessionOpened");
}
@Override
protected void processStreamIo(IoSession session, InputStream input,
OutputStream output) {
System.out.println(session.getLocalAddress() + " "
+ "processStreamIo");
// 客户端发送文件
File sendFile = new File("e:\\MyKey.jks");
FileInputStream fis = null;
try {
fis = new FileInputStream(sendFile);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
// 放入线程让其执行
// 客户端一般都用一个线程实现即可 不用线程池
new IoStreamThreadWork(fis, output).start();
return;
}
}
在 Mina 的使用过程中,应注意以下问题:
【1】客户端IoSession.setAttribute后,服务器IoSession.getAttribute为null,这两个session虽然getId一样,但不是同一个session
【2】使用TextLineCodecFactory,messageReceived方法默认只能接收以换行符结束的信息
【3】使用getAttribute时应注意判断空指针:if (session.getAttribute("name") != null && session.getAttribute("name").equals(name))
否则不会报错,但下面的程序段会终止执行。可在exceptionCaught方法中catch到异常
【4】每个客户端对应一个ConnectFuture,每个ConnectFuture对应一个IoSession
【5】可以巧用session.setAttribute("loginFrame", LoginFrame.this);来进行窗口的传递,注意:如果在监听中直接传this,要注意是不是LoginFrame.this
【6】mina不能动态修改acceptor的ProtocolCodecFilter和Handler
【7】传输文件可通过StreamIoHandler和MessageEncoder的方式
整个案例的源码及相关 jar 包在百度云中给出链接 Mina,读者如要运行项目,还需自行安装 MySQL 创建数据库。
谢谢您的关注和支持!