文章推荐系统 | 九、基于内容的离线及在线召回

推荐阅读:
文章推荐系统 | 一、推荐流程设计
文章推荐系统 | 二、同步业务数据
文章推荐系统 | 三、收集用户行为数据
文章推荐系统 | 四、构建离线文章画像
文章推荐系统 | 五、计算文章相似度
文章推荐系统 | 六、构建离线用户画像
文章推荐系统 | 七、构建离线文章特征和用户特征
文章推荐系统 | 八、基于模型的离线召回

在上篇文章中,我们实现了基于模型的离线召回,属于基于协同过滤的召回算法。接下来,本文就讲一下另一个经典的召回方式,那就是如何实现基于内容的离线召回。相比于协同过滤来说,基于内容的召回会简单很多,主要思路就是召回用户点击过的文章的相似文章,通常也被叫做 u2i2i。

离线召回

首先,读取用户历史行为数据,得到用户历史点击过的文章

spark.sql('use profile')
user_article_basic = spark.sql("select * from user_article_basic")
user_article_basic = user_article_basic.filter('clicked=True')

user_article_basic 结果如下所示

接下来,遍历用户历史点击过的文章,获取与之相似度最高的 K 篇文章即可。可以根据之前计算好的文章相似度表 article_similar 进行相似文章查询,接着根据历史召回结果进行过滤,防止重复推荐。最后将召回结果按照频道分别存入召回结果表及历史召回结果表

user_article_basic.foreachPartition(get_clicked_similar_article)

def get_clicked_similar_article(partition):
    """召回用户点击文章的相似文章
    """
    import happybase
    pool = happybase.ConnectionPool(size=10, host='hadoop-master')
    
    with pool.connection() as conn:
        similar_table = conn.table('article_similar')
        for row in partition:
            # 读取文章相似度表,根据文章ID获取相似文章
            similar_article = similar_table.row(str(row.article_id).encode(),
                                                columns=[b'similar'])
            # 按照相似度进行排序
            similar_article_sorted = sorted(similar_article.items(), key=lambda item: item[1], reverse=True)
            if similar_article_sorted:
                # 每次行为推荐10篇文章
                similar_article_topk = [int(i[0].split(b':')[1]) for i in similar_article_sorted][:10]

                # 根据历史召回结果进行过滤
                history_table = conn.table('history_recall')
                history_article_data = history_table.cells('reco:his:{}'.format(row.user_id).encode(), 'channel:{}'.format(row.channel_id).encode())
                # 将多个版本都加入历史文章ID列表
                history_article = []
                if len(history_article_data) >= 2:
                    for article in history_article_data[:-1]:
                        history_article.extend(eval(article))
                else:
                    history_article = []

                # 过滤history_article
                recall_article = list(set(similar_article_topk) - set(history_article))

                # 存储到召回结果表及历史召回结果表
                if recall_article:
                    content_table = conn.table('cb_recall')
                    content_table.put("recall:user:{}".format(row.user_id).encode(), {'content:{}'.format(row.channel_id).encode(): str(recall_article).encode()})

                    # 放入历史召回结果表
                    history_table.put("reco:his:{}".format(row.user_id).encode(), {'channel:{}'.format(row.channel_id).encode(): str(recall_article).encode()})

可以根据用户 ID 和频道 ID 来查询召回结果

hbase(main):028:0> get 'cb_recall', 'recall:user:2'
COLUMN                     CELL                                                                        
content:13                    timestamp=1558041569201, value=[141431,14381, 17966, 17454, 14125, 16174]   

最后,使用 Apscheduler 定时更新。在用户召回方法 update_user_recall() 中,增加基于内容的离线召回方法 update_content_recall(),首先读取用户行为日志,并筛选用户点击的文章,接着读取文章相似表,获取相似度最高的 K 篇文章,然后根据历史召回结果进行过滤,防止重复推荐,最后,按频道分别存入召回结果表及历史召回结果表

def update_user_recall():
    """
    用户的频道推荐召回结果更新逻辑
    :return:
    """
    ur = UpdateRecall(500)
    ur.update_als_recall()
    ur.update_content_recall()

之前已经添加好了定时更新用户召回结果的任务,每隔 3 小时运行一次,这样就完成了基于内容的离线召回。

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ProcessPoolExecutor

# 创建scheduler,多进程执行
executors = {
    'default': ProcessPoolExecutor(3)
}

scheduler = BlockingScheduler(executors=executors)

# 添加一个定时运行文章画像更新的任务, 每隔1个小时运行一次
scheduler.add_job(update_article_profile, trigger='interval', hours=1)
# 添加一个定时运行用户画像更新的任务, 每隔2个小时运行一次
scheduler.add_job(update_user_profile, trigger='interval', hours=2)
# 添加一个定时运行用户召回更新的任务,每隔3小时运行一次
scheduler.add_job(update_user_recall, trigger='interval', hours=3)
# 添加一个定时运行特征中心平台的任务,每隔4小时更新一次
scheduler.add_job(update_ctr_feature, trigger='interval', hours=4)

scheduler.start()

在线召回

前面我们实现了基于内容的离线召回,接下来我们将实现基于内容的在线召回。在线召回的实时性更好,能够根据用户的线上行为实时反馈,快速跟踪用户的偏好,也能够解决用户冷启动问题。离线召回和在线召回唯一的不同就是,离线召回读取的是用户历史行为数据,而在线召回读取的是用户实时的行为数据,从而召回用户当前正在阅读的文章的相似文章。

首先,我们通过 Spark Streaming 读取 Kafka 中的用户实时行为数据,Spark Streaming 配置如下

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from setting.default import DefaultConfig
import happybase

SPARK_ONLINE_CONFIG = (
        ("spark.app.name", "onlineUpdate"), 
        ("spark.master", "yarn"),
        ("spark.executor.instances", 4)
    )

KAFKA_SERVER = "192.168.19.137:9092"

# 用于读取hbase缓存结果配置
pool = happybase.ConnectionPool(size=10, host='hadoop-master', port=9090)
conf = SparkConf()
conf.setAll(SPARK_ONLINE_CONFIG)
sc = SparkContext(conf=conf)
stream_c = StreamingContext(sc, 60)

# 基于内容召回配置,用于收集用户行为
similar_kafkaParams = {"metadata.broker.list": DefaultConfig.KAFKA_SERVER, "group.id": 'similar'}
SIMILAR_DS = KafkaUtils.createDirectStream(stream_c, ['click-trace'], similar_kafkaParams)

Kafka 中的用户行为数据,如下所示

{"actionTime":"2019-12-10 21:04:39","readTime":"","channelId":18,"param":{"action": "click", "userId": "2", "articleId": "116644", "algorithmCombine": "C2"}}

接下来,利用 Spark Streaming 将用户行为数据传入到 get_similar_online_recall() 方法中,这里利用 json.loads() 方法先将其转换为了 json 格式,注意用户行为数据在每条 Kafka 消息的第二个位置

SIMILAR_DS.map(lambda x: json.loads(x[1])).foreachRDD(get_similar_online_recall)

接着,遍历用户行为数据,这里可能每次读取到多条用户行为数据。筛选出被点击、收藏或分享过的文章,并获取与其相似度最高的 K 篇文章,再根据历史召回结果表进行过滤,防止重复推荐,最后,按频道分别存入召回结果表及历史召回结果表

def get_online_similar_recall(rdd):
    """
    获取在线相似文章
    :param rdd:
    :return:
    """
    import happybase

    topk = 10
    # 初始化happybase连接
    pool = happybase.ConnectionPool(size=10, host='hadoop-master', port=9090)
    for data in rdd.collect():

        # 根据用户行为筛选文章
        if data['param']['action'] in ["click", "collect", "share"]:
            with pool.connection() as conn:
                similar_table = conn.table("article_similar")

                # 根据用户行为数据涉及文章找出与之最相似文章(基于内容的相似)
                similar_article = similar_table.row(str(data["param"]["articleId"]).encode(), columns=[b"similar"])
                similar_article = sorted(similar_article.items(), key=lambda x: x[1], reverse=True)  # 按相似度排序

                if similar_article:
                    similar_article_topk = [int(i[0].split(b":")[1]) for i in similar_article[:topk]] # 选取K篇作为召回推荐结果

                    # 根据历史召回结果进行过滤
                    history_table = conn.table('history_recall')
                    history_article_data = history_table.cells(b"reco:his:%s" % data["param"]["userId"].encode(), b"channel:%d" % data["channelId"])
                    # 将多个版本都加入历史文章ID列表
                    history_article = []
                    if len(history_article_data) >1:
                        for article in history_article_data[:-1]:
                            history_article.extend(eval(article))
                    else:
                        history_article = []

                    # 过滤history_article
                    recall_article = list(set(similar_article_topk) - set(history_article))

                    # 如果有召回结果,按频道分别存入召回结果表及历史召回结果表
                    if recall_article:
                        recall_table = conn.table("cb_recall")
                        recall_table.put(b"recall:user:%s" % data["param"]["userId"].encode(), {b"online:%d" % data["channelId"]: str(recall_article).encode()})
                        history_table.put(b"reco:his:%s" % data["param"]["userId"].encode(), {b"channel:%d" % data["channelId"]: str(recall_article).encode()})

                conn.close()

可以根据用户 ID 和频道 ID 来查询召回结果

hbase(main):028:0> get 'cb_recall', 'recall:user:2'
COLUMN                     CELL                                                                        
online:13                    timestamp=1558041569201, value=[141431,14381, 17966, 17454, 14125, 16174]   

创建 online_update.py,加入基于内容的在线召回逻辑

if __name__ == '__main__':
    ore = OnlineRecall()
    ore.update_content_recall()
    stream_sc.start()
    _ONE_DAY_IN_SECONDS = 60 * 60 * 24
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        pass

利用 Supervisor 进行进程管理,并开启实时运行,配置如下,其中 environment 需要指定运行所需环境

[program:online]
environment=JAVA_HOME=/root/bigdata/jdk,SPARK_HOME=/root/bigdata/spark,HADOOP_HOME=/root/bigdata/hadoop,PYSPARK_PYTHON=/miniconda2/envs/reco_sys/bin/python ,PYSPARK_DRIVER_PYTHON=/miniconda2/envs/reco_sys/bin/python,PYSPARK_SUBMIT_ARGS='--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.2 pyspark-shell'
command=/miniconda2/envs/reco_sys/bin/python /root/toutiao_project/reco_sys/online/online_update.py
directory=/root/toutiao_project/reco_sys/online
user=root
autorestart=true
redirect_stderr=true
stdout_logfile=/root/logs/onlinesuper.log
loglevel=info
stopsignal=KILL
stopasgroup=true
killasgroup=true

参考

https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(学习资源已保存至网盘, 提取码:eakp)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342