多租户适配
需要从产品底层进行尽量少的改造,能够满足上云之后多租户的数据、缓存、定时任务等隔离
多租户适配条目
条目名称 | 适配方案 |
---|---|
持久层适配 | 支持schema和字段隔离两种方案 |
quartz定时任务 | 上下文无法获取租户信息,通过JobGroup识别 |
reids缓存 | 缓存key体现租户id即可 |
websocket场景 | 从cookie获取、前端调用diwork的api获取租户信息塞到cookie,后端websocket握手后从cookie获取 |
1. 持久层适配
考虑到产品业务的实际情况,要求数据源同时支持schema隔离和字段隔离,持久层的多租户适配业务代码需要零感知、无侵入,适配实现过程如下:
STEP-1. 表结构改造,追加租户字段、有预置脚本的表,需要跟租户字段建立联合主键;
STEP-2. 引入动态数据源,动态数据源查询租户信息,切换schema实现租户按schema隔离;
STEP-3. 改造dao,采用cglib加入Interceptor,在dao层方法的执> 行前加入拦截;
STEP-4. 用jsqlParser编写sql解析类,第3步拦截到的sql追加租户ID的条件;
动态数据源关键代码
获取租户信息中的schema信息,根据schema信息切换,租户信息通过rest接口获取,考虑了到性能已加ThreadLocal和redis两重缓存
protected Connection changeCatalog(Connection con) throws SQLException {
String tenantId = InvocationInfoProxy.getTenantid();
if (StringUtils.isBlank(tenantId)) {
tenantId = "tenant";
}
String catalog = this.getCatalog(tenantId);
if (StringUtils.isNotBlank(catalog)) {
try {
con.setCatalog(catalog);
} catch (SQLException e) {
logger.error("Error occurred when setting catalog for connection, Tenant ID is {}", tenantId);
con.close();
throw e;
}
} else {
// logger.error("Switching catalog failed, check tenant ID -> {}!", tenantId);
String defaultCatalog = PropertyUtil.getPropertyByKey("jdbc.catalog");
if (StringUtils.isNotBlank(defaultCatalog) && !defaultCatalog.equals(con.getCatalog())) {
con.setCatalog(defaultCatalog);
logger.info("reset catalog for connection success!");
}
}
return con;
}
dao层改造关键代码
通过cglib代理的方式改造dao层,业务代码对租户隔离零感知
protected MdmJdbcPersistenceManager createPersistenceManager() throws DbException {
if (this.manager == null) {
try {
this.lock.lock();
if (this.manager == null) {
MdmJdbcSession jdbcSession = ProxyFactory.getProxy(
MdmJdbcSession.class,
new Class[]{JdbcTemplate.class, DBMetaHelper.class},
new Object[] {jdbcTemplate, dbMetaHelper},
new MdmJdbcPersistenceFilter(),
//0 无操作
NoOp.INSTANCE,
// 执行SQL
new ExecuteInterceptor(jdbcTemplate, dbMetaHelper));
manager = new MdmJdbcPersistenceManager(jdbcTemplate, dbMetaHelper, jdbcSession);
}
} finally {
this.lock.unlock();
}
}
return (MdmJdbcPersistenceManager) this.manager;
}
Interceptor 关键代码
@Override
public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
List<String> sqlList = new ArrayList<>();
try {
if (objects[SQL_INDEX] instanceof String) {
sqlList = Collections.singletonList(String.valueOf(objects[SQL_INDEX]));
} else if (objects[SQL_INDEX] instanceof List) {
sqlList = (List<String>) objects[SQL_INDEX];
}
} catch (Exception e) {
logger.error("Errors occurred when extract sql from jdbc session, details:" + e.getMessage(), e);
}
if (CollectionUtils.isNotEmpty(sqlList)) {
List<String> processedSqlList = MdmSQLParser.process(sqlList);
if (CollectionUtils.isNotEmpty(processedSqlList)) {
if (objects[SQL_INDEX] instanceof String) {
objects[SQL_INDEX] = processedSqlList.get(0);
} else if (objects[SQL_INDEX] instanceof List) {
objects[SQL_INDEX] = processedSqlList;
}
}
}
return methodProxy.invokeSuper(o, objects);
}
sqlParser关键代码
采用jSqlParser解析sql语句,并拼接租户id的条件,sql语法解析会消耗部分性能,为了提高性能加入了缓存
public static String parseAndProcess(String oldSql) throws Exception {
String cacheSql = getCache(oldSql);
if(!CommonUtils.isNULL(cacheSql)) {
return cacheSql;
}
Statement stmt = CCJSqlParserUtil.parse(oldSql);
if (stmt instanceof Select) {
Select select = (Select) stmt;
logger.debug("select-sql处理前:" + select);
//检查、处理select
checkAndHandleSelectBody(select.getSelectBody());
logger.debug("select-sql处理后:" + select);
} else if (stmt instanceof Insert){
Insert insert = (Insert) stmt;
logger.debug("insert-sql处理前:" + stmt);
processInsert(insert);
logger.debug("insert-sql处理后:" + stmt);
} else if (stmt instanceof Update) {
Update update = (Update) stmt;
logger.debug("update-sql处理前:" + stmt);
processUpdate(update);
logger.debug("update-sql处理后:" + stmt);
} else if (stmt instanceof Delete) {
Delete delete = (Delete) stmt;
logger.debug("delete-sql处理前:" + stmt);
processDelete(delete);
logger.debug("delete-sql处理后:" + stmt);
}
//其他形式语句暂不处理
putCache(oldSql, stmt.toString());
return stmt.toString();
}
2. 定时任务适配
通过租户开通的回调函数,在其中通过消息驱动的方式,在主数据实例中通过消费方式,来给租户启动定时任务,租户的id即为定时任务的JobGroup,这样job在执行业务逻辑时,可以通过JobGroup获取租户信息,以下代码是通过redis发布订阅方式实现,也可以通过mq实现
final JedisPubSub jedisPubSub = new JedisPubSub() {
@SuppressWarnings("unchecked")
@Override
public void onMessage(String channel, String message) {
try {
if (CHANNEL.equals(channel) && StringUtils.isNotBlank(message.trim())) {
channelMessage[0] = message;
InvocationInfoProxy.setTenantid(message);
//数据统计的定时任务
String statisticJobGroup = STATISTIC_ANALYSIS_JOB_GROUP;
String statisticIdleJobName = "jobDetailStatisticAnalysisBgJob";
String statisticIdleJobNameByDayJobName = "jobDetailStatisticAnalysisByDayBgJob";
if (!QuartzManager.checkExists(statisticJobGroup, statisticIdleJobName)) {
Class idleClazz = Class.forName("com.yonyou.iuapmdm.scheduleJob.task.StatisticAnalysisBgJob");
//"0 30 1,12 * * ?"
String idleCronExp = AppUtils.getPropertyValue(APPLICATION_PROPERTIES, "idleCronExp");
QuartzManager.addJob(statisticJobGroup, statisticIdleJobName, idleClazz, null, idleCronExp);
}
if (!QuartzManager.checkExists(statisticJobGroup, statisticIdleJobNameByDayJobName)) {
Class byDayClazz = Class.forName("com.yonyou.iuapmdm.scheduleJob.task.StatisticAnalysisByDayBgJob");
//"0 0 1 * * ?"
String byDayCronExp = AppUtils.getPropertyValue(APPLICATION_PROPERTIES, "byDayCronExp");
QuartzManager.addJob(statisticJobGroup, statisticIdleJobNameByDayJobName, byDayClazz, null, byDayCronExp);
}
//标签过期扫描定时任务
String tagExpireScanJobGroup = TAG_EXPIRE_SCAN_JOB_GROUP;
String tagExpireScanJobName = "jobDetailTagExpireScanBgJob";
if (!QuartzManager.checkExists(tagExpireScanJobGroup, tagExpireScanJobName)) {
Class expireScanClz = Class.forName("com.yonyou.iuapmdm.scheduleJob.task.TagExpireScanBgJob");
//"0 0 * * * ?" 每一小时执行一次
String expireScanCronExp = AppUtils.getPropertyValue(APPLICATION_PROPERTIES, "tagExpireScanCronExp");
QuartzManager.addJob(tagExpireScanJobGroup, tagExpireScanJobName, expireScanClz, null, expireScanCronExp);
}
}
} catch (Exception e) {
logger.error("Error occurred adding statistic analysis job, details:" + e.getMessage(), e);
}
}
};
Thread daemon = new Thread(() -> {
MdmCacheManager.getInstance().subscribe(jedisPubSub, CHANNEL);
});
3.redis缓存适配
比较简单,构造key的时候体现tenantId即可
private String buildKey(String key) {
String tenantId = InvocationInfoProxy.getTenantid();
if (StringUtils.isBlank(tenantId)) {
tenantId = "tenant";
}
return StringUtils.join(new String[]{tenantId, key}, ":");
}
4.websocket场景
从WebSocketSession中获取cookie信息,设置上下文即可
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
final List<String> cookie = session.getHandshakeHeaders().get("cookie");
setLoginInfo(cookie);
}
private void setLoginInfo(List<String> cookieList) {
if(cookieList != null && cookieList.size()>0) {
String cookie = cookieList.get(0);
String local = CommonUtils.getULocale(cookie);
String _A_P_userId = CommonUtils.getCookieValue(cookie, "_A_P_userId");
String _A_P_userLoginName = CommonUtils.getCookieValue(cookie, "_A_P_userLoginName");
if (StringUtils.isBlank(_A_P_userId) || StringUtils.isBlank(_A_P_userLoginName)) {
_A_P_userId = CommonUtils.getCookieValue(cookie, "yonyou_uid");
_A_P_userLoginName = CommonUtils.decodeTwice(CommonUtils.getCookieValue(cookie, "yonyou_uname"));
}
InvocationInfoProxy.setLocale(local);
InvocationInfoProxy.setUserid(_A_P_userId);
InvocationInfoProxy.setUsername(_A_P_userLoginName);
String tenantId = CommonUtils.getCookieValue(cookie, "tenantId");
String tenantIdValue = StringUtils.isBlank(tenantId) ? "t6ecrakt" : tenantId;
InvocationInfoProxy.setTenantid(tenantIdValue);
}
}
关注同名公主号,获取更多优质文章