推荐阅读:
文章推荐系统 | 一、推荐流程设计
文章推荐系统 | 二、同步业务数据
文章推荐系统 | 三、收集用户行为数据
文章推荐系统 | 四、构建离线文章画像
文章推荐系统 | 五、计算文章相似度
文章推荐系统 | 六、构建离线用户画像
文章推荐系统 | 七、构建离线文章特征和用户特征
文章推荐系统 | 八、基于模型的离线召回
在上篇文章中,我们实现了基于模型的离线召回,属于基于协同过滤的召回算法。接下来,本文就讲一下另一个经典的召回方式,那就是如何实现基于内容的离线召回。相比于协同过滤来说,基于内容的召回会简单很多,主要思路就是召回用户点击过的文章的相似文章,通常也被叫做 u2i2i。
离线召回
首先,读取用户历史行为数据,得到用户历史点击过的文章
spark.sql('use profile')
user_article_basic = spark.sql("select * from user_article_basic")
user_article_basic = user_article_basic.filter('clicked=True')
user_article_basic
结果如下所示
接下来,遍历用户历史点击过的文章,获取与之相似度最高的 K 篇文章即可。可以根据之前计算好的文章相似度表 article_similar 进行相似文章查询,接着根据历史召回结果进行过滤,防止重复推荐。最后将召回结果按照频道分别存入召回结果表及历史召回结果表
user_article_basic.foreachPartition(get_clicked_similar_article)
def get_clicked_similar_article(partition):
"""召回用户点击文章的相似文章
"""
import happybase
pool = happybase.ConnectionPool(size=10, host='hadoop-master')
with pool.connection() as conn:
similar_table = conn.table('article_similar')
for row in partition:
# 读取文章相似度表,根据文章ID获取相似文章
similar_article = similar_table.row(str(row.article_id).encode(),
columns=[b'similar'])
# 按照相似度进行排序
similar_article_sorted = sorted(similar_article.items(), key=lambda item: item[1], reverse=True)
if similar_article_sorted:
# 每次行为推荐10篇文章
similar_article_topk = [int(i[0].split(b':')[1]) for i in similar_article_sorted][:10]
# 根据历史召回结果进行过滤
history_table = conn.table('history_recall')
history_article_data = history_table.cells('reco:his:{}'.format(row.user_id).encode(), 'channel:{}'.format(row.channel_id).encode())
# 将多个版本都加入历史文章ID列表
history_article = []
if len(history_article_data) >= 2:
for article in history_article_data[:-1]:
history_article.extend(eval(article))
else:
history_article = []
# 过滤history_article
recall_article = list(set(similar_article_topk) - set(history_article))
# 存储到召回结果表及历史召回结果表
if recall_article:
content_table = conn.table('cb_recall')
content_table.put("recall:user:{}".format(row.user_id).encode(), {'content:{}'.format(row.channel_id).encode(): str(recall_article).encode()})
# 放入历史召回结果表
history_table.put("reco:his:{}".format(row.user_id).encode(), {'channel:{}'.format(row.channel_id).encode(): str(recall_article).encode()})
可以根据用户 ID 和频道 ID 来查询召回结果
hbase(main):028:0> get 'cb_recall', 'recall:user:2'
COLUMN CELL
content:13 timestamp=1558041569201, value=[141431,14381, 17966, 17454, 14125, 16174]
最后,使用 Apscheduler 定时更新。在用户召回方法 update_user_recall()
中,增加基于内容的离线召回方法 update_content_recall()
,首先读取用户行为日志,并筛选用户点击的文章,接着读取文章相似表,获取相似度最高的 K 篇文章,然后根据历史召回结果进行过滤,防止重复推荐,最后,按频道分别存入召回结果表及历史召回结果表
def update_user_recall():
"""
用户的频道推荐召回结果更新逻辑
:return:
"""
ur = UpdateRecall(500)
ur.update_als_recall()
ur.update_content_recall()
之前已经添加好了定时更新用户召回结果的任务,每隔 3 小时运行一次,这样就完成了基于内容的离线召回。
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
# 创建scheduler,多进程执行
executors = {
'default': ProcessPoolExecutor(3)
}
scheduler = BlockingScheduler(executors=executors)
# 添加一个定时运行文章画像更新的任务, 每隔1个小时运行一次
scheduler.add_job(update_article_profile, trigger='interval', hours=1)
# 添加一个定时运行用户画像更新的任务, 每隔2个小时运行一次
scheduler.add_job(update_user_profile, trigger='interval', hours=2)
# 添加一个定时运行用户召回更新的任务,每隔3小时运行一次
scheduler.add_job(update_user_recall, trigger='interval', hours=3)
# 添加一个定时运行特征中心平台的任务,每隔4小时更新一次
scheduler.add_job(update_ctr_feature, trigger='interval', hours=4)
scheduler.start()
在线召回
前面我们实现了基于内容的离线召回,接下来我们将实现基于内容的在线召回。在线召回的实时性更好,能够根据用户的线上行为实时反馈,快速跟踪用户的偏好,也能够解决用户冷启动问题。离线召回和在线召回唯一的不同就是,离线召回读取的是用户历史行为数据,而在线召回读取的是用户实时的行为数据,从而召回用户当前正在阅读的文章的相似文章。
首先,我们通过 Spark Streaming 读取 Kafka 中的用户实时行为数据,Spark Streaming 配置如下
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from setting.default import DefaultConfig
import happybase
SPARK_ONLINE_CONFIG = (
("spark.app.name", "onlineUpdate"),
("spark.master", "yarn"),
("spark.executor.instances", 4)
)
KAFKA_SERVER = "192.168.19.137:9092"
# 用于读取hbase缓存结果配置
pool = happybase.ConnectionPool(size=10, host='hadoop-master', port=9090)
conf = SparkConf()
conf.setAll(SPARK_ONLINE_CONFIG)
sc = SparkContext(conf=conf)
stream_c = StreamingContext(sc, 60)
# 基于内容召回配置,用于收集用户行为
similar_kafkaParams = {"metadata.broker.list": DefaultConfig.KAFKA_SERVER, "group.id": 'similar'}
SIMILAR_DS = KafkaUtils.createDirectStream(stream_c, ['click-trace'], similar_kafkaParams)
Kafka 中的用户行为数据,如下所示
{"actionTime":"2019-12-10 21:04:39","readTime":"","channelId":18,"param":{"action": "click", "userId": "2", "articleId": "116644", "algorithmCombine": "C2"}}
接下来,利用 Spark Streaming 将用户行为数据传入到 get_similar_online_recall()
方法中,这里利用 json.loads()
方法先将其转换为了 json 格式,注意用户行为数据在每条 Kafka 消息的第二个位置
SIMILAR_DS.map(lambda x: json.loads(x[1])).foreachRDD(get_similar_online_recall)
接着,遍历用户行为数据,这里可能每次读取到多条用户行为数据。筛选出被点击、收藏或分享过的文章,并获取与其相似度最高的 K 篇文章,再根据历史召回结果表进行过滤,防止重复推荐,最后,按频道分别存入召回结果表及历史召回结果表
def get_online_similar_recall(rdd):
"""
获取在线相似文章
:param rdd:
:return:
"""
import happybase
topk = 10
# 初始化happybase连接
pool = happybase.ConnectionPool(size=10, host='hadoop-master', port=9090)
for data in rdd.collect():
# 根据用户行为筛选文章
if data['param']['action'] in ["click", "collect", "share"]:
with pool.connection() as conn:
similar_table = conn.table("article_similar")
# 根据用户行为数据涉及文章找出与之最相似文章(基于内容的相似)
similar_article = similar_table.row(str(data["param"]["articleId"]).encode(), columns=[b"similar"])
similar_article = sorted(similar_article.items(), key=lambda x: x[1], reverse=True) # 按相似度排序
if similar_article:
similar_article_topk = [int(i[0].split(b":")[1]) for i in similar_article[:topk]] # 选取K篇作为召回推荐结果
# 根据历史召回结果进行过滤
history_table = conn.table('history_recall')
history_article_data = history_table.cells(b"reco:his:%s" % data["param"]["userId"].encode(), b"channel:%d" % data["channelId"])
# 将多个版本都加入历史文章ID列表
history_article = []
if len(history_article_data) >1:
for article in history_article_data[:-1]:
history_article.extend(eval(article))
else:
history_article = []
# 过滤history_article
recall_article = list(set(similar_article_topk) - set(history_article))
# 如果有召回结果,按频道分别存入召回结果表及历史召回结果表
if recall_article:
recall_table = conn.table("cb_recall")
recall_table.put(b"recall:user:%s" % data["param"]["userId"].encode(), {b"online:%d" % data["channelId"]: str(recall_article).encode()})
history_table.put(b"reco:his:%s" % data["param"]["userId"].encode(), {b"channel:%d" % data["channelId"]: str(recall_article).encode()})
conn.close()
可以根据用户 ID 和频道 ID 来查询召回结果
hbase(main):028:0> get 'cb_recall', 'recall:user:2'
COLUMN CELL
online:13 timestamp=1558041569201, value=[141431,14381, 17966, 17454, 14125, 16174]
创建 online_update.py
,加入基于内容的在线召回逻辑
if __name__ == '__main__':
ore = OnlineRecall()
ore.update_content_recall()
stream_sc.start()
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
pass
利用 Supervisor 进行进程管理,并开启实时运行,配置如下,其中 environment 需要指定运行所需环境
[program:online]
environment=JAVA_HOME=/root/bigdata/jdk,SPARK_HOME=/root/bigdata/spark,HADOOP_HOME=/root/bigdata/hadoop,PYSPARK_PYTHON=/miniconda2/envs/reco_sys/bin/python ,PYSPARK_DRIVER_PYTHON=/miniconda2/envs/reco_sys/bin/python,PYSPARK_SUBMIT_ARGS='--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.2 pyspark-shell'
command=/miniconda2/envs/reco_sys/bin/python /root/toutiao_project/reco_sys/online/online_update.py
directory=/root/toutiao_project/reco_sys/online
user=root
autorestart=true
redirect_stderr=true
stdout_logfile=/root/logs/onlinesuper.log
loglevel=info
stopsignal=KILL
stopasgroup=true
killasgroup=true
参考
https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(学习资源已保存至网盘, 提取码:eakp)