基于 MybatisPlus 逻辑删除开启情况下兼容多数据库实现批量插入更新通用流程实现
背景
- 项目上业务流程有大量使用
mysql
批量插入更新语法insert on duplicate update
- 由于现在公司业务需要,同时兼容达梦数据库使用,达梦数据库虽然也有类似的语法,不过使用起来也比较麻烦,生成相应的
SQL
可以看我这一篇文章
核心问题
-
MySQL
更新插入流程如何使用实现? - 如何实现多个数据库兼容插入更新的流程?
- 如何编写工具类优雅实现插入通用流程封装以及整合
MybatisPlus
lambda
表达式查询,达到方便易用的效果? - 开启
MybatisPlus
逻辑删除功能,怎么通过自定义SQL
查询出所有数据(插入更新流程可能涉及到所有的数据,并不是只是处理未逻辑删除的数据)? - 批量数据插入更新速度如何优化?
代码实现
实现多数据库兼容插入更新操作,只能根据 MySQL
插入更新原理利用代码抽象通用化流程,较好的通用化方式是使用 MybatisPlus
自带的通用 CRUD
逻辑方法实现,但是原有自带方法开启逻辑删除功能以后,查询方法都会自带过滤逻辑删除的数据,需要实现自定义 SQL
注入器,为了实现一套不带逻辑删除的通用方法
拓展自定义 SQL
注入器实现
新增自定义方法
SelectList
方法不带逻辑删除
public class SelectListWithoutLogicDelete extends AbstractMethod {
@Override
public MappedStatement injectMappedStatement(Class<?> mapperClass, Class<?> modelClass, TableInfo tableInfo) {
String sql = String.format(
SqlMethod.SELECT_LIST.getSql(),
sqlFirst(),
sqlSelectColumns(tableInfo, true),
tableInfo.getTableName(),
sqlWhereEntityWrapper(true, tableInfo),
sqlComment()
);
SqlSource sqlSource = languageDriver.createSqlSource(configuration, sql, modelClass);
String mapperMethodName = StrUtil.lowerFirst(getClass().getSimpleName());
return addSelectMappedStatementForTable(mapperClass, mapperMethodName, sqlSource, tableInfo);
}
// 重写 sqlWhereEntityWrapper ,去掉逻辑删除相关代码
@Override
protected String sqlWhereEntityWrapper(boolean newLine, TableInfo table) {
String sqlScript = table.getAllSqlWhere(false, true, WRAPPER_ENTITY_DOT);
sqlScript = SqlScriptUtils.convertIf(sqlScript, String.format("%s != null", WRAPPER_ENTITY), true);
sqlScript += NEWLINE;
sqlScript += SqlScriptUtils.convertIf(String.format(SqlScriptUtils.convertIf(" AND", String.format("%s and %s", WRAPPER_NONEMPTYOFENTITY, WRAPPER_NONEMPTYOFNORMAL), false) + " ${%s}", WRAPPER_SQLSEGMENT),
String.format("%s != null and %s != '' and %s", WRAPPER_SQLSEGMENT, WRAPPER_SQLSEGMENT,
WRAPPER_NONEMPTYOFWHERE), true);
sqlScript = SqlScriptUtils.convertWhere(sqlScript) + NEWLINE;
sqlScript += SqlScriptUtils.convertIf(String.format(" ${%s}", WRAPPER_SQLSEGMENT),
String.format("%s != null and %s != '' and %s", WRAPPER_SQLSEGMENT, WRAPPER_SQLSEGMENT,
WRAPPER_EMPTYOFWHERE), true);
sqlScript = SqlScriptUtils.convertIf(sqlScript, String.format("%s != null", WRAPPER), true);
return newLine ? NEWLINE + sqlScript : sqlScript;
}
}
UpdateById
方法不带逻辑删除
public class UpdateByIdWithoutLogicDelete extends AbstractMethod {
@Override
public MappedStatement injectMappedStatement(Class<?> mapperClass, Class<?> modelClass, TableInfo tableInfo) {
SqlMethod sqlMethod = SqlMethod.UPDATE_BY_ID;
final String additional = optlockVersion(tableInfo);
String sql = String.format(
sqlMethod.getSql(),
tableInfo.getTableName(),
// 搬运 UpdateById 代码,第一个参数变化了
sqlSet(false, false, tableInfo, false, ENTITY, ENTITY_DOT),
tableInfo.getKeyColumn(),
ENTITY_DOT + tableInfo.getKeyProperty(),
additional
);
SqlSource sqlSource = languageDriver.createSqlSource(configuration, sql, modelClass);
String mapperMethodName = StrUtil.lowerFirst(getClass().getSimpleName());
return addUpdateMappedStatement(mapperClass, modelClass, mapperMethodName, sqlSource);
}
}
注入不带逻辑删除自定义 mapper
方法
public class SqlInjectorExtension extends DefaultSqlInjector {
@Override
public List<AbstractMethod> getMethodList(Class<?> mapperClass) {
List<AbstractMethod> methods = super.getMethodList(mapperClass);
// 原来基础上注入两个新方法
methods.add(new SelectListWithoutLogicDelete());
methods.add(new UpdateByIdWithoutLogicDelete());
return methods;
}
}
封装自定义 mapper
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
import com.baomidou.mybatisplus.extension.conditions.query.QueryChainWrapper;
import com.baomidou.mybatisplus.extension.conditions.update.LambdaUpdateChainWrapper;
import com.baomidou.mybatisplus.extension.conditions.update.UpdateChainWrapper;
import org.apache.ibatis.annotations.Param;
public interface BaseMapperExtension<T> extends BaseMapper<T> {
// === ChainWrappers 工具类转发 ===
/**
* 链式查询 普通
*
* @return QueryWrapper 的包装类
*/
default QueryChainWrapper<T> queryChain() {
return new QueryChainWrapper<>(this);
}
/**
* 链式查询 lambda 式
* <p>注意:不支持 Kotlin </p>
*
* @return LambdaQueryWrapper 的包装类
*/
default LambdaQueryChainWrapper<T> lambdaQueryChain() {
return new LambdaQueryChainWrapper<>(this);
}
/**
* 链式更改 普通
*
* @return UpdateWrapper 的包装类
*/
default UpdateChainWrapper<T> updateChain() {
return new UpdateChainWrapper<>(this);
}
/**
* 链式更改 lambda 式
* <p>注意:不支持 Kotlin </p>
*
* @return LambdaUpdateWrapper 的包装类
*/
default LambdaUpdateChainWrapper<T> lambdaUpdateChain() {
return new LambdaUpdateChainWrapper<>(this);
}
// === 自定义 sql ===
/**
* 忽略逻辑删除功能的 selectList 方法
*
* @param queryWrapper
* 查询条件
*
* @return {@code List<T>}
*/
List<T> selectListWithoutLogicDelete(@Param(Constants.WRAPPER) Wrapper<T> queryWrapper);
/**
* 忽略逻辑删除功能的 updateById 方法
*
* @param entity
* 实体
*
* @return int
*/
int updateByIdWithoutLogicDelete(@Param(Constants.ENTITY) T entity);
}
封装自定义 IServiceExtension
和 ServiceImplExtension
模板
public interface IServiceExtension<T extends IBaseDO<T>> extends IService<T> {
@Override
BaseMapperExtension<T> getBaseMapper();
// === BaseMapperExtension 新默认方法 ===
/**
* 通过 id 更新实体(忽略逻辑删除功能)
*
* @param entity
* 实体数据
*
* @return boolean
*/
default boolean updateByIdWithoutLogicDelete(T entity) {
return SqlHelper.retBool(getBaseMapper().updateByIdWithoutLogicDelete(entity));
}
default List<T> listWithoutLogicDelete(Wrapper<T> queryWrapper) {
return getBaseMapper().selectListWithoutLogicDelete(queryWrapper);
}
default List<T> listWithoutLogicDelete() {
return listWithoutLogicDelete(Wrappers.emptyWrapper());
}
// === 增强型方法 ===
/**
* 批量根据 ID 更新(忽略逻辑删除)
*
* @param entityList
* 实体列表
* @param batchSize
* 批次大小
*
* @return boolean
*/
boolean updateBatchByIdWithoutLogicDelete(Collection<T> entityList, int batchSize);
}
public abstract class ServiceImplExtension<M extends BaseMapperExtension<T>, T extends IBaseDO<T>> extends ServiceImpl<M, T> implements IServiceExtension<T> {}
新增 MybatisPlusUtils
封装
lambda
表达式通用插入更新流程
- 基础实体类接口
IBaseDO
import java.util.Date;
public interface IBaseDO<E> {
Long getId();
E setId(Long id);
Date getCreateDate();
E setCreateDate(Date createDate);
Date getUpdateDate();
E setUpdateDate(Date updateDate);
}
- 抽象实体类
@EqualsAndHashCode
public abstract class BaseDO<E extends BaseDO<E>> implements IBaseDO<E> {
@ApiModelProperty(value = "主键")
@TableId(value = "id", type = IdType.ASSIGN_ID)
private Long id;
@ApiModelProperty(value = "创建时间")
@TableField(value = "create_date", fill = FieldFill.INSERT)
private Date createDate;
@ApiModelProperty(value = "更新时间")
@TableField(value = "update_date", fill = FieldFill.INSERT_UPDATE)
private Date updateDate;
@Override
public Long getId() {
return id;
}
@Override
public E setId(Long id) {
this.id = id;
return self();
}
private E self() {
return (E) this;
}
@Override
public Date getCreateDate() {
return createDate;
}
@Override
public E setCreateDate(Date createDate) {
this.createDate = createDate;
return self();
}
@Override
public Date getUpdateDate() {
return updateDate;
}
@Override
public E setUpdateDate(Date updateDate) {
this.updateDate = updateDate;
return self();
}
public E fillLatestDate() {
Date date = new Date();
createDate = date;
updateDate = date;
return self();
}
public E fillDate(Date date) {
createDate = date;
updateDate = date;
return self();
}
}
-
MyBatisPlusUtils
工具类
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
import java.util.function.BiFunction;
import static java.util.stream.Collectors.*;
@Slf4j
public abstract class MyBatisPlusUtils {
/**
* mysql insert on duplicate key update 替代方法
*
* @param service
* service 类
* @param dataCollect
* 新数据列表
* @param isFillId
* 无 ID 是否填充 ID
* @param uniqueCondition
* 唯一索引
* @param updateMapping
* 更新 mapping:函数入参为查询到的旧数据 & 新数据,出参为更新后的旧数据(不能设置id和相关唯一索引)
* @param <T>
* 实体泛型
*/
public static <T extends IBaseDO<T>, S extends IServiceExtension<T>> List<T> insertOnUpdateBatch(
S service,
Collection<T> dataCollect,
boolean isFillId,
BiFunction<T, LambdaQueryWrapper<T>, LambdaQueryWrapper<T>> uniqueCondition,
BiFunction<T, T, T> updateMapping
) {
Objects.requireNonNull(service);
Objects.requireNonNull(dataCollect);
Objects.requireNonNull(updateMapping);
Objects.requireNonNull(uniqueCondition);
Class<T> entityClass = service.getEntityClass();
Objects.requireNonNull(entityClass);
if (CollectionUtils.isEmpty(dataCollect)) {
log.debug("[{}] insert on update data size: 0,ignore", service.getEntityClass().getSimpleName());
return new ArrayList<>();
}
Map<Boolean, List<T>> collect = dataCollect.stream()
.filter(Objects::nonNull)
.peek(newData -> execFillIdStrategy(isFillId, newData))
.flatMap(newData -> {
LambdaQueryWrapper<T> chainWrapper = uniqueCondition.apply(newData, Wrappers.lambdaQuery(service.getEntityClass()));
Objects.requireNonNull(chainWrapper);
// 构造通过 ID 或者 多个唯一索引查询条件,进行数据库查询,查询结果可能查询出来多条
List<Pair<Boolean, T>> updateData = service.listWithoutLogicDelete(chainWrapper.or(wrapper -> wrapper.eq(IBaseDO::getId, newData.getId())))
.stream()
.map(oldData -> {
// 查询出来需要更新的数据
Date newUpdateDate = newData.getUpdateDate();
Date oldCreateDate = oldData.getCreateDate();
Date now = new Date();
// 更新操作回调
T update = updateMapping.apply(newData, oldData)
// 指定填充默认更新字段
.setCreateDate(oldCreateDate == null ? now : oldCreateDate)
.setUpdateDate(newUpdateDate == null ? now : newUpdateDate);
// 转换器返回不同 ID 数据抛出异常
if (!Objects.equals(update.getId(), oldData.getId())) {
throw new RuntimeException("mapping can't return new data");
}
return Pair.of(false, update);
})
.collect(toList());
if (CollectionUtils.isNotEmpty(updateData)) {
return updateData.stream();
}
// 如果上面不符合,就是需要插入的新数据
List<Pair<Boolean, T>> insertData = Collections.singletonList(Pair.of(true, newData));
return insertData.stream();
})
.collect(groupingBy(Pair::getKey, mapping(Pair::getValue, toList())));
// 提取需要新插入的数据
List<T> insertData = collect.getOrDefault(true, new ArrayList<>());
if (CollectionUtils.isNotEmpty(insertData)) {
service.saveBatch(insertData);
}
// 提取需要更新的数据
List<T> updateData = collect.getOrDefault(false, new ArrayList<>());
if (CollectionUtils.isNotEmpty(updateData)) {
// 批量更新数据
service.updateBatchByIdWithoutLogicDelete(updateData);
}
log.debug(
"[{}] data size:[{}],insert data size:[{}],update data size:[{}]",
entityClass.getSimpleName(),
dataCollect.size(),
insertData.size(),
updateData.size()
);
insertData.addAll(updateData);
return insertData;
}
/**
* 执行填充ID策略
*
* @param isFillId
* 是否填充 ID
* @param newData
* 数据
* @param <T>
* 类型
*/
private static <T extends IBaseDO<T>> void execFillIdStrategy(boolean isFillId, T newData) {
Long id = newData.getId();
if (id != null) {
return;
}
if (!isFillId) {
throw new RuntimeException("data id can't null,data:" + newData);
}
newData.setId(IdWorker.getId());
}
}
session
批量写库优化
- 由于原来的实现只需要一条
SQL
进行批量更新,现在插入更新逻辑是在业务代码中实现,最少是需要 2 条SQL
,频繁发送SQL
进行读写操作耗时非常大,我们需要对插入更新操作的SQL
进行批量输入执行,使用BATCH
批处理模式,减少重复预编译的次数。
工具类整合
为了更加方便使用,
MybatisPlusUtils
工具类整合进去自定义IServiceExtension
和ServiceImplExtension
模板
- 整合后
IServiceExtension
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.extension.toolkit.SqlHelper;
import org.springframework.transaction.annotation.Transactional;
import java.util.Collection;
import java.util.List;
import java.util.function.BiFunction;
public interface IServiceExtension<T extends IBaseDO<T>> extends IService<T> {
@Override
BaseMapperExtension<T> getBaseMapper();
// === BaseMapperExtension 新默认方法 ===
/**
* 通过 id 更新实体(忽略逻辑删除功能)
*
* @param entity
* 实体数据
*
* @return boolean
*/
default boolean updateByIdWithoutLogicDelete(T entity) {
return SqlHelper.retBool(getBaseMapper().updateByIdWithoutLogicDelete(entity));
}
default List<T> listWithoutLogicDelete(Wrapper<T> queryWrapper) {
return getBaseMapper().selectListWithoutLogicDelete(queryWrapper);
}
default List<T> listWithoutLogicDelete() {
return listWithoutLogicDelete(Wrappers.emptyWrapper());
}
// === 增强型方法 ===
/**
* 批量根据 ID 更新(忽略逻辑删除)
*
* @param entityList
* 实体列表
* @param batchSize
* 批次大小
*
* @return boolean
*/
boolean updateBatchByIdWithoutLogicDelete(Collection<T> entityList, int batchSize);
/**
* 批量根据 ID 更新(忽略逻辑删除)
*
* @param entityList
* 实体列表
*
* @return boolean
*/
@Transactional(rollbackFor = Exception.class)
default boolean updateBatchByIdWithoutLogicDelete(Collection<T> entityList) {
return updateBatchByIdWithoutLogicDelete(entityList, DEFAULT_BATCH_SIZE);
}
/**
* mysql insert on duplicate key update 替代方法
*
* @param dataCollect
* 新数据列表
* @param isFillId
* 无 ID 是否填充 ID
* @param uniqueCondition
* 唯一索引
* @param updateMapping
* 更新函数:函数入参为查询到的旧数据 & 新数据,出参为更新后的旧数据(不能设置id和相关唯一索引)
*
* @return {@code List<T>}
*/
List<T> insertOnUpdateBatch(
Collection<T> dataCollect,
boolean isFillId,
BiFunction<T, LambdaQueryWrapper<T>, LambdaQueryWrapper<T>> uniqueCondition,
BiFunction<T, T, T> updateMapping
);
@Transactional(rollbackFor = Exception.class)
default List<T> insertOnUpdateBatch(
Collection<T> dataCollect,
BiFunction<T, LambdaQueryWrapper<T>, LambdaQueryWrapper<T>> uniqueCondition,
BiFunction<T, T, T> updateMapping
) {
return insertOnUpdateBatch(dataCollect, true, uniqueCondition, updateMapping);
}
}
- 整合后
ServiceImplExtension
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.ibatis.binding.MapperMethod;
import org.springframework.transaction.annotation.Transactional;
import java.util.Collection;
import java.util.List;
import java.util.function.BiFunction;
public abstract class ServiceImplExtension<M extends BaseMapperExtension<T>, T extends IBaseDO<T>> extends ServiceImpl<M, T> implements IServiceExtension<T> {
@Transactional(rollbackFor = Exception.class)
@Override
public boolean updateBatchByIdWithoutLogicDelete(Collection<T> entityList, int batchSize) {
String sqlStatement = mapperClass.getName() + StringPool.DOT + "updateByIdWithoutLogicDelete";
// 批量更新优化 使用 BATCH 批处理模式
return executeBatch(entityList, batchSize, (sqlSession, entity) -> {
MapperMethod.ParamMap<T> param = new MapperMethod.ParamMap<>();
param.put(Constants.ENTITY, entity);
sqlSession.update(sqlStatement, param);
});
}
@Transactional(rollbackFor = Exception.class)
@Override
public List<T> insertOnUpdateBatch(
Collection<T> dataCollect,
boolean isFillId,
BiFunction<T, LambdaQueryWrapper<T>, LambdaQueryWrapper<T>> uniqueCondition,
BiFunction<T, T, T> updateMapping
) {
return MyBatisPlusUtils.insertOnUpdateBatch(this, dataCollect, isFillId, uniqueCondition, updateMapping);
}
}
使用案例
基础实体
继承
BaseDO
类
@ApiModel(value = "网络拓扑图-区域")
@Data
@TableName(value = "cmdb_topology_region")
public class CmdbTopologyRegion extends BaseDO<CmdbTopologyRegion> {
@TableField(value = "tenant_code")
@ApiModelProperty(value = "租户编码")
private Long tenantCode;
}
Mapper 接口
继承
BaseMapperExtension
接口
public interface CmdbTopologyRegionMapper extends BaseMapperExtension<CmdbTopologyRegion> {
}
Service 接口
继承
IServiceExtension
接口
@Validated
public interface CmdbTopologyRegionService extends IServiceExtension<CmdbTopologyRegion> {
@Validated
int insertOrUpdate(List<CmdbTopologyRegion> regions);
}
Service 实现
@Slf4j
@Service
public class CmdbTopologyRegionServiceImpl extends ServiceImplExtension<CmdbTopologyRegionMapper, CmdbTopologyRegion> implements CmdbTopologyRegionService {
@Override
public int insertOrUpdate(List<CmdbTopologyRegion> regions) {
// regions 为插入更新的数据列表
List<CmdbTopologyRegion> update = insertOnUpdateBatch(regions,
// 唯一索引查询条件构造,这个案例只是一个唯一索引 TenantCode,如果没有唯一索引,直接 return wrapper,如果存在多个唯一索引,使用 wrapper.eq(唯一索引1 构造).or(w->w.eq(唯一索引2 构造))
(newData, wrapper) -> wrapper.eq(CmdbTopologyRegion::getTenantCode,"1002"),
(newData, oldData) -> {
// 存在数据时候,会调用这个更新回调方法,入参为(新数据,数据库查询数据),这里需要把 newData 的更新字段值填充进去 oldData 返回
BeanUtil.copyProperties(newData, oldData, "id","tenantCode");
return oldData;
}
);
return update.size();
}
}
问题回顾
- 详细查看
MySQL
插入更新原理 - 不同数据库的语法不一致,只能使用标准
SQL
,所以最好使用业务代码逻辑实现 - 详细查看
MyBatisPlusUtils
工具类的实现方式 - 自定义
SQL
注入器实现 - 使用
Mybatis BATCH
批处理模式
优点缺点
优点
- 简单易用,支持原有
MybatisPlus
的Lambda
表达式查询方式,不需要自定义SQL
实现 - 需要使用插入更新的时候,不需要考虑不同数据库兼容性
- 有清晰的业务代码逻辑实现,可以对每条需要更新数据的自定义属性拷贝和转换,同时具备对更新数据进行回调
- 对原有代码改造少:把原来继承实现的基础模板替换(
BaseMapper
,IService
,ServiceImpl
)
缺点
- 执行速度慢,
SQL
执行数量变多,原来只需要一条SQL
就可以实现批量插入更新,现在需要多条SQL
(包括查询、插入和更新语句)