直接代码
package com.util;
import com.mongodb.*;
import com.mongodb.client.*;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import com.mongodb.client.result.DeleteResult;
import lombok.extern.slf4j.Slf4j;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.springframework.beans.factory.annotation.Value;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* mongodb 工具类
*
* @author huxiaofei
* @Version 1.0
*/
@Slf4j
public class MongodbUtil {
@Value("${spring.data.mongodb.database}")
private static String dbName;
@Value("${spring.data.mongodb.collection}")
private static String collectionName;
/**
* 创建集合
*
* @param client
*/
public static void createCollection(final MongoClient client) {
try {
client.getDatabase(dbName).createCollection(collectionName);
} catch (MongoCommandException e) {
if (!e.getErrorCodeName().equals("NamespaceExists")) {
throw e;
}
}
}
/**
* 获取集合
*
* @param client
* @return
*/
public static MongoCollection<Document> getCollection(final MongoClient client) {
return client.getDatabase(dbName).getCollection(collectionName);
}
/**
* 事务操作
*
* @param client
* @param document 需要传的具体参数
*/
public static void mongoTransaction(final MongoClient client, Document document) {
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.MAJORITY)
.writeConcern(WriteConcern.MAJORITY)
.build();
//事务逻辑
try (ClientSession clientSession = client.startSession()) {
clientSession.withTransaction(
() -> {
// 获取数据库
MongoDatabase db = client.getDatabase(dbName);
MongoCollection<Document> collection = db.getCollection(collectionName);
//插入单条数据
collection.insertOne(clientSession, document);
return null;
},
txnOptions
);
} catch (MongoException e) {
log.error("Transaction aborted. Caught exception during transaction.");
throw e;
}
}
/**
* 事务操作插入多条数据
*
* @param client
* @param document
*/
public static boolean mongoInsertManyTransaction(final MongoClient client, List<Document> document) {
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.MAJORITY)
.writeConcern(WriteConcern.MAJORITY)
.build();
// 事务逻辑
try (ClientSession clientSession = client.startSession()) {
clientSession.withTransaction(
() -> {
// 获取数据库
MongoDatabase db = client.getDatabase(dbName);
MongoCollection<Document> collection = db.getCollection(collectionName);
//插入多条文档数据
collection.insertMany(clientSession, document);
return true;
},
txnOptions
);
}
return false;
}
/**
* 根据条件查询
*
* @param client
* @param conditions map类型
* @return
*/
public static List<Document> findDocuments(final MongoClient client, Map<String, Object> conditions) {
Document query = new Document();
// 拼接查询条件
for (Map.Entry<String, Object> entry : conditions.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
query.append(key, value);
}
return getCollection(client).find(query).into(new ArrayList<>());
}
/**
* 多条件查询
*
* @param client
* @param conditions Filters.eq("name", "John Doe"),
* Filters.gt("age", 25)
* @return
*/
public static List<Document> findDocumentsWithMultipleConditions(final MongoClient client, Bson... conditions) {
MongoCollection<Document> collection = getCollection(client);
Bson queryCondition = Filters.and(conditions); // 组合多个查询条件
List<Document> results = new ArrayList<>();
collection.find(queryCondition).iterator().forEachRemaining(results::add);
return results;
}
/**
* 根据条件批量更新
*
* @param client
* @param filterMap
* @param updateMap
*/
public static void updateDocuments(final MongoClient client, Map<String, Object> filterMap, Map<String, Object> updateMap) {
MongoCollection<Document> collection = getCollection(client);
// 构建过滤条件
List<Bson> filters = new ArrayList<>();
for (Map.Entry<String, Object> entry : filterMap.entrySet()) {
filters.add(Filters.eq(entry.getKey(), entry.getValue()));
}
// 构建更新操作
List<Bson> updates = new ArrayList<>();
for (Map.Entry<String, Object> entry : updateMap.entrySet()) {
updates.add(Updates.set(entry.getKey(), entry.getValue()));
}
// 应用过滤条件和更新操作
Bson filter = Filters.and(filters);
Bson update = Updates.combine(updates);
// 执行更新操作
collection.updateMany(filter, update);
}
/**
* 根据条件删除
*
* @param client
* @param filterMap 删除条件
*/
public static void deleteDocumentsByCondition(final MongoClient client, Map<String, Object> filterMap) {
MongoCollection<Document> collection = getCollection(client);
// 构建过滤条件
List<Bson> filters = new ArrayList<>();
for (Map.Entry<String, Object> entry : filterMap.entrySet()) {
filters.add(Filters.eq(entry.getKey(), entry.getValue()));
}
// 删除条件
Bson filter = Filters.and(filters);
// 批量删除
DeleteResult deleteResult = collection.deleteMany(filter);
log.info("Deleted " + deleteResult.getDeletedCount() + " documents");
}
/**
* 普通插入一条数据
*
* @param client
* @param document
*/
public static void insertDocument(final MongoClient client, Document document) {
client.getDatabase(dbName).getCollection(collectionName).insertOne(document);
}
/**
* 递归查询
* children 方式
*
* @param collection
* @param parentNode
*/
public static void performRecursiveQuery(MongoCollection<Document> collection, Document parentNode) {
if (parentNode != null) {
log.info("Parent Node: " + parentNode);
// Get children nodes and recursively query each child
if (parentNode.containsKey("children")) {
for (String childId : (List<String>) parentNode.get("children")) {
// 查询出符合条件的
Document childNode = findNodeById(collection, childId);
performRecursiveQuery(collection, childNode);
}
}
}
}
/**
* 根据_id查询一条数据
*
* @param collection
* @param nodeId
* @return
*/
public static Document findNodeById(MongoCollection<Document> collection, String nodeId) {
return collection.find(new Document("_id", nodeId)).first();
}
/**
* 自旋重试提交
*
* @param clientSession
*/
private void commitWithRetry(final ClientSession clientSession) {
while (true) {
try {
clientSession.commitTransaction();
break;
} catch (MongoException e) {
// can retry commit
if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
log.info("UnknownTransactionCommitResult, retrying commit operation ...");
} else {
log.error("Exception during commit ...");
throw e;
}
}
}
}
}