推荐引擎场景方案的设计
说明:
推荐引擎的一般设计包括:推荐入口、场景服务(scene)、方案服务(solution),三者之间通过Map的数据结构进行组织。
推荐入口通过维护场景的Map对象[Map<String, SceneImpl> Scenes],方便通过场景的名字找到对应的场景对象。
场景服务(scene)通过维护方案的Map对象[Map<String, SolutionImpl> solutions],方便通过方案的名字找到对应的方案对象。
场景服务(scene)通过维护方案的筛选规则[SolutionSelector selector],方便根据分流规则返回方案的名字,进而通过方案的Map对象查找对应的方案对象。
// 推荐引擎的核心服务
public class RecomServiceImpl {
private Map<String, SceneImpl> Scenes
public RecomResult recommend(RecomParam param) {
// 省略相关代码
}
}
// 推荐引擎的场景(scene)服务
public class SceneImpl implements RecommendHandler {
private final String sceneName;
private Map<String, SolutionImpl> solutions;
private SolutionSelector selector;
public SceneImpl(EngineContext context, RecomEngineConfig
recomEngineConfig, String sceneName) {
this.sceneName = sceneName;
this.solutions = ImmutableMap.copyOf(new HashMap<String, Solution>());
}
}
// 推荐引擎的方案(solution)服务
public class SolutionImpl implements RecommendHandler {
private String sceneName;
private String solutionName;
private RecommendHandler handler;
private URLClassLoader classLoader;
private RecomEngineConfig recomEngineConfig;
private SolutionImpl(RecomEngineConfig recomEngineConfig, String sceneName, String solutionName)
{
this.sceneName = sceneName;
this.solutionName = solutionName;
this.recomEngineConfig = recomEngineConfig;
this.solutionJarPath = File.separator + solutionName + ".jar";
try {
File jarFile = new File(solutionJarPath);
classLoader = new URLClassLoader(new URL[] { new File(solutionJarPath).toURI().toURL() },
Thread.currentThread().getContextClassLoader());
Class<?> clazz = classLoader.loadClass(SolutionUtils.getClassName(sceneName, solutionName));
handler = (RecommendHandler) (clazz.newInstance());
handler.init(context);
} catch (Throwable t) {
// 省略相关代码
}
}
public static SolutionImpl build(final EngineContext context, final RecomEngineConfig recomEngineConfig,
String sceneName, String solutionName) {
Solution solution = null;
try {
solution = new SolutionImpl(context, recomEngineConfig, sceneName, solutionName);
return solution;
} catch (Throwable t) {
}
}
}
推荐引擎推荐流程
说明:
推荐引擎的执行流程按照如下步骤进行:根据场景名找到场景对象、根据场景对象找到方案对象,执行方案的方法。
根据场景名找到对应的场景,SceneImpl scene = this.Scenes.get(sceneId)。
根据方案名找到对应的方案,SolutionImpl solution = solutions.get(solutionName)。
执行方案的推荐方法进行推荐,handler.recommend(param)。
public class RecomServiceImpl {
public RecomResult recommend(RecomParam param) {
RecomResult result = null;
try{
String sceneId = param.getSceneId();
Scene scene = this.Scenes.get(sceneId);
result = scene.recommend(reqId, condition);
}finally {
// 省略相关代码
}
return result;
}
}
public class SceneImpl implements RecommendHandler {
public RecomResult recommend(RecomParam param) {
return doRecommand(param);
}
private RecomResult doRecommand(RecomParam param) {
try {
String solutionName = selector.selectSolution(condition);
SolutionImpl solution = solutions.get(solutionName);
return solution.recommend(reqId, condition);
} catch (Throwable t) {
// 省略相关代码
}
}
}
public class SolutionImpl implements RecommendHandler {
private RecommendHandler handler;
private URLClassLoader classLoader;
private SolutionImpl(RecomEngineConfig recomEngineConfig, String sceneName, String solutionName)
{
this.sceneName = sceneName;
this.solutionName = solutionName;
this.recomEngineConfig = recomEngineConfig;
this.solutionJarPath = File.separator + solutionName + ".jar";
try {
File jarFile = new File(solutionJarPath);
classLoader = new URLClassLoader(new URL[] { new File(solutionJarPath).toURI().toURL() },
Thread.currentThread().getContextClassLoader());
Class<?> clazz = classLoader.loadClass(SolutionUtils.getClassName(sceneName, solutionName));
handler = (RecommendHandler) (clazz.newInstance());
handler.init(context);
} catch (Throwable t) {
// 省略相关代码
}
}
public RecomResult recommend(RecomParam param) {
RecomResult result = null;
try {
if (null != engineContext) {
result = handler.recommend(param);
}
return result;
} catch (Throwable t) {
// 省略相关代码
}
}
}
推荐引擎场景方案管理方案
说明:
推荐引擎方案场景管理主要包括程序启动方案的初始化和程序运行过程中方案的更新两大块内容。
推荐引擎方案场景及对应的配置信息通过zk进行管理,通过zk维护一系列的场景节点,每个场景节点配置有这个场景的所有配置信息包括方案流量配比等。
推荐引擎场景方案的初始化过程就是获取所有场景节点配置信息,进行配置的保存,和初始化动作。
负责读取场景的配置信息zkReadData(zkScenePath);负责保持场景的配置信息scenesConfigs.put(sceneName, data);负责初始化场景信息scene.update(data)。
scene.update(data)负责解析场景的配置信息生成方案的对象,主要逻辑是遍历场景下所有方案的配置信息生成solution对象。
scene.update(data)的另外一个核心的作用就是生成场景下所有方案的分流比例对象SceneSolutionSelector。
public class RecomServiceImpl {
private Map<String, Scene> currentScenes;
protected void syncSceneSolutionsFromZK() {
try {
Map<String, String> scenesConfigs = new HashMap<String, String>();
synchronized (mutex) {
// 获取所有场景的配置信息
List<String> scenes = zkGetChildren(zkSceneDir);
Map<String, Scene> newScenes = new HashMap<String, Scene>();
newScenes.putAll(currentScenes);
// 遍历所有场景获取场景的配置信息,并挨个进行处理。
for (String sceneName : scenes) {
String zkScenePath = recomConfig.getSceneZKPath(sceneName);
// 读取场景的配置信息
String data = zkReadData(zkScenePath);
// 保存场景的配置信息
scenesConfigs.put(sceneName, data);
try {
SceneImpl scene = currentScenes.get(sceneName);
// 根据场景是否为空创建场景对象
if (scene == null) {
scene = new SceneImpl(context, recomConfig, sceneName);
}
// 根据配置生成场景对象
if (!scene.update(data)) {
continue;
}
newScenes.put(sceneName, scene);
} catch (Throwable se) {
}
}
runningScenes = ImmutableMap.copyOf(newScenes);
}
} catch (Throwable t) {
}
}
public boolean update(String data) {
try {
SceneConfig newSceneConfig = JSON.parseObject(data, SceneConfig.class);
Map<String, SolutionImpl> newSolutions = new HashMap<String, SolutionImpl>();
for (String solutionName : newSceneConfig.allSolutions()) {
SolutionImplsolution = SolutionImpl.build(context, recomEngineConfig, sceneName, solutionName);
newSolutions.put(solutionName, solution);
}
// 生成场景所有方案的对象
solutions = ImmutableMap.copyOf(newSolutions);
// 生成分流选择器
selector = SceneSolutionSelector.build(newSceneConfig);
// 保存最新的场景配置信息
sceneConfig = newSceneConfig;
return true;
} catch (Throwable t) {
}
return false;
}
}
推荐引擎方案场景分流方案
说明:
推荐场景方案当中按照场景划分的方案当中,按照白名单用户的方案分流、按比例的方案分流、兜底的方案分流进行划分。
针对方案选择器的SceneSolutionSelector的对象,我们可以看到内部通过SolutionSelectorStrategy这个对象进行串联,每个SolutionSelectorStrategy 对象内部包含指向下一个 方案的nextStrategy对象。
SolutionSelectorStrategy作为所有方案选择策略的抽象基类,内部实现通用的方案选择操作,并提供抽象方法由各个选择策略自身去实现。
public class SceneSolutionSelector {
private SolutionSelectorStrategy strategy = null;
public String selectSolution(final RecomParam param) {
return this.strategy.selectSolution(param);
}
public static SceneSolutionSelector build(final SceneConfig conf) {
SceneSolutionSelector selector = new SceneSolutionSelector();
// 默认分流策略
String defaultSolution = conf.getDefaultSolution();
DefaultStrategy ds = new DefaultStrategy(defaultSolution);
selector.strategy = ds;
// 按比例分流策略
Map<String, Integer> ap = conf.getAccessPortion();
if (ap != null && ap.size() > 0) {
SpecifyRateStrategy srs = new SpecifyRateStrategy(ap, conf.isAccessByUid(), conf.getStrategyMap());
srs.setNext(selector.strategy);
selector.strategy = srs;
}
// 白名单策略
Map<Long, String> dau = conf.getDirectAccessUser();
if (dau != null && dau.size() > 0) {
DirectAccessUserStrategy daus = new DirectAccessUserStrategy(dau);
daus.setNext(selector.strategy);
selector.strategy = daus;
}
return selector;
}
}
public abstract class SolutionSelectorStrategy {
private SolutionSelectorStrategy nextStrategy = null;
public void setNext(SolutionSelectorStrategy strategy) {
this.nextStrategy = strategy;
}
public abstract String takeSolution(RecomParam condition);
public String selectSolution(RecomParam condition) {
String solutionName = takeSolution(condition);
if (solutionName != null) {
return solutionName;
}
if (nextStrategy != null) {
return nextStrategy.selectSolution(condition);
}
return null;
}
}
public class DefaultStrategy extends SolutionSelectorStrategy {
private String defaultSolution = null;
public DefaultStrategy(String defaultSolution) {
this.defaultSolution = defaultSolution;
}
public String takeSolution(RecomParam condition) {
return defaultSolution;
}
}
public class DirectAccessUserStrategyextends SolutionSelectorStrategy {
ImmutableMap<Long, String> directAccessUser = null;
public DirectAccessUserStrategy(Map<Long, String> directAccessUser) {
this.directAccessUser = ImmutableMap.copyOf(directAccessUser);
}
@Override
public String takeSolution(RecomParam condition) {
if (this.directAccessUser == null) {
return null;
}
Long uid = condition.getUid();
if (uid == null) {
return null;
}
String solutionName = directAccessUser.get(uid);
if (solutionName == null) {
return null;
}
return solutionName;
}
}
public class SpecifyRateStrategy extends SolutionSelectorStrategy {
String[] solutionPortion = null;
//是否按照uid进行分流
private boolean accessByUid = false;
//按照uid进行分流的分流策略
Map<Integer, String> strategyMap = new HashMap<Integer,String>();
public SpecifyRateStrategy(Map<String, Integer> solutionRates, boolean accessByUid, Map<Integer, String> strategyMap) {
this.accessByUid = accessByUid;
if (solutionRates != null && solutionRates.size() > 0) {
solutionPortion = new String[SceneConfig.TOTAL_BUCKET_PORTION];
int idx = 0;
List<String> solutionList = new ArrayList<String>();
for(String solutionName : solutionRates.keySet()){
solutionList.add(solutionName);
}
Collections.sort(solutionList);
for (String solutionName : solutionList) {
int portion = solutionRates.get(solutionName);
for (int i = 0; i < portion; i++) {
solutionPortion[idx++] = solutionName;
if (idx > solutionPortion.length) {
throw new IllegalArgumentException("Invalidate rate " + solutionRates);
}
}
}
while (idx < solutionPortion.length) {
solutionPortion[idx++] = null;
}
}
if(null != strategyMap && strategyMap.size() > 0){
this.strategyMap = strategyMap;
}
}
public String takeSolution(RecomParam condition) {
Long uid = condition.getUid();
if (uid == null) {
return null;
}
if(accessByUid){
if(null == strategyMap){
return null;
}
int hashCode = Math.abs(uid.toString().hashCode());
int idx = (int) (hashCode % SceneConfig.UID_TOTAL_BUCKET_PORTION);
return strategyMap.get(Integer.valueOf(idx));
}else{
String sceneId = condition.getSceneId();
if(StringUtils.isEmpty(sceneId)){
return null;
}
int sceneIdHashCode = sceneId.hashCode();
if (sceneIdHashCode < 0){
sceneIdHashCode = -sceneIdHashCode;
}
if (uid < 0) {
uid = -uid;
}
long id = uid + sceneIdHashCode;
int idx = (int) (id % SceneConfig.TOTAL_BUCKET_PORTION);
return solutionPortion[idx];
}
}
}
推荐引擎方案场景同步方案
说明:
- 推荐的场景方案更新通过监听zk的节点感知变更,然后重新走syncSceneSolutionsFromZK()初始化流程然后用新生成的场景方案对象替换旧的场景方案对象。
public class RecomPlatformService {
protected void watchTrigger() {
client.subscribeDataChanges(recomEngineConfig.getZKTriggerPath(), new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
syncSceneSolutionsFromZK();
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
syncSceneSolutionsFromZK();
}
});
}
}