文章推荐系统 | 七、构建离线文章特征和用户特征

推荐阅读:
文章推荐系统 | 一、推荐流程设计
文章推荐系统 | 二、同步业务数据
文章推荐系统 | 三、收集用户行为数据
文章推荐系统 | 四、构建离线文章画像
文章推荐系统 | 五、计算文章相似度
文章推荐系统 | 六、构建离线用户画像

前面我们完成了文章画像和用户画像的构建,画像数据主要是提供给召回阶段的各种召回算法使用。接下来,我们还要为排序阶段的各种排序模型做数据准备,通过特征工程将画像数据进一步加工为特征数据,以供排序模型直接使用。

我们可以将特征数据存储到 Hbase 中,这里我们先在 Hbase 中创建好 ctr_feature_article 表 和 ctr_feature_user 表,分别存储文章特征数据和用户特征数据

-- 文章特征表
create 'ctr_feature_article', 'article'
-- 如 article:13401 timestamp=1555635749357, value=[18.0,0.08196639249252607,0.11217275332895373,0.1353835167902181,0.16086650318453152,0.16356418791892943,0.16740082750337945,0.18091837445730974,0.1907214431716628,0.2........................-0.04634634410271921,-0.06451843378804649,-0.021564142420785692,0.10212902152136256]

-- 用户特征表
create 'ctr_feature_user', 'channel'
-- 如 4 column=channel:13, timestamp=1555647172980, value=[]

构建文章特征

文章特征包括文章关键词权重、文章频道以及文章向量,我们首先读取文章画像

spark.sql("use article")
article_profile = spark.sql("select * from article_profile")

在文章画像中筛选出权重最高的 K 个关键词的权重,作为文章关键词的权重向量

def article_profile_to_feature(row):
    try:
        article_weights = sorted(row.keywords.values())[:10]
    except Exception as e:
        article_weights = [0.0] * 10
    return row.article_id, row.channel_id, article_weights
article_profile = article_profile.rdd.map(article_profile_to_feature).toDF(['article_id', 'channel_id', 'weights'])

article_profile 结果如下所示,weights 即为文章关键词的权重向量

接下来,读取文章向量信息,再将频道 ID 和文章向量加入进来,利用 article_id 将 article_profile 和 article_vector 进行内连接,并将 weights 和 articlevector 转为 vector 类型

article_vector = spark.sql("select * from article_vector")
article_feature = article_profile.join(article_vector, on=['article_id'], how='inner')

def feature_to_vector(row):

    from pyspark.ml.linalg import Vectors

    return row.article_id, row.channel_id, Vectors.dense(row.weights), Vectors.dense(row.articlevector)

article_feature = article_feature.rdd.map(feature_to_vector).toDF(['article_id', 'channel_id', 'weights', 'articlevector'])

最后,我们将 channel_id, weights, articlevector 合并为一列 features 即可(通常 channel_id 可以进行 one-hot 编码,我们这里先省略了)

from pyspark.ml.feature import VectorAssembler

columns = ['article_id', 'channel_id', 'weights', 'articlevector']
article_feature = VectorAssembler().setInputCols(columns[1:4]).setOutputCol("features").transform(article_feature)

article_feature 结果如下所示,features 就是我们准备好的文章特征

最后,将文章特征结果保存到 Hbase 中

def save_article_feature_to_hbase(partition):
    import happybase
    pool = happybase.ConnectionPool(size=10, host='hadoop-master')
    with pool.connection() as conn:
        table = conn.table('ctr_feature_article')
        for row in partition:
            table.put('{}'.format(row.article_id).encode(),
                     {'article:{}'.format(row.article_id).encode(): str(row.features).encode()})

article_feature.foreachPartition(save_article_feature_to_hbase)

构建用户特征

由于用户在不同频道的偏好差异较大,所以我们要计算用户在每个频道的特征。首先读取用户画像,将空值列删除

spark.sql("use profile")

user_profile_hbase = spark.sql("select user_id, information.birthday, information.gender, article_partial, env from user_profile_hbase")

user_profile_hbase 结果如下所示,其中 article_partial 为用户标签及权重,如 (['18:vars': 0.2, '18: python':0.2, ...], ['19:java': 0.2, '19: javascript':0.2, ...], ...) 表示某个用户在 18 号频道的标签包括 var、python 等,在 19 号频道的标签包括 java、javascript等。

由于 gender 和 birthday 两列空值较多,我们将这两列去除(实际场景中也可以根据数据情况选择填充)

# 去除空值列
user_profile_hbase = user_profile_hbase.drop('env', 'birthday', 'gender')

提取用户 ID,获取 user_id 列的内容中 : 后面的数值即为用户 ID

def get_user_id(row):
    return int(row.user_id.split(":")[1]), row.article_partial

user_profile_hbase = user_profile_hbase.rdd.map(get_user_id)

user_profile_hbase 转为 DataFrame 类型

from pyspark.sql.types import *

_schema = StructType([
    StructField("user_id", LongType()),
    StructField("weights", MapType(StringType(), DoubleType()))
])

user_profile_hbase = spark.createDataFrame(user_profile_hbase, schema=_schema)

接着,将每个频道内权重最高的 K 个标签的权重作为用户标签权重向量

def frature_preprocess(row):

    from pyspark.ml.linalg import Vectors

    user_weights = []
    for i in range(1, 26):
        try:
            channel_weights = sorted([row.weights[key] for key in row.weights.keys() if key.split(':')[0] == str(i)])[:10]
            user_weights.append(channel_weights)
        except:
            user_weights.append([0.0] * 10)
    return row.user_id, user_weights

user_features = user_profile_hbase.rdd.map(frature_preprocess).collect()

user_features 就是我们计算好的用户特征,数据结构类似 (10, [[0.2, 2.1, ...], [0.2, 2.1, ...]], ...),其中元组第一个元素 10 即为用户 ID,第二个元素是长度为 25 的用户频道标签权重列表,列表中每个元素是长度为 K 的用户标签权重列表,代表用户在某个频道下的标签权重向量。

最后,将用户特征结果保存到 Hbase,利用 Spark 的 batch() 方法,按频道批量存储用户特征

import happybase

# 批量插入Hbase数据库中
pool = happybase.ConnectionPool(size=10, host='hadoop-master', port=9090)
with pool.connection() as conn:
    ctr_feature = conn.table('ctr_feature_user')
    with ctr_feature.batch(transaction=True) as b:
        for i in range(len(user_features)):
            for j in range(25):
                b.put("{}".format(res[i][0]).encode(),{"channel:{}".format(j+1).encode(): str(res[i][1][j]).encode()})
    conn.close()

Apscheduler 定时更新

定义文章特征和用户特征的离线更新方法

def update_ctr_feature():
    """
    更新文章特征和用户特征
    :return:
    """
    fp = FeaturePlatform()
    fp.update_user_ctr_feature_to_hbase()
    fp.update_article_ctr_feature_to_hbase()

在 Apscheduler 中添加定时更新文章特征和用户特征的任务,每隔 4 小时运行一次

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ProcessPoolExecutor

# 创建scheduler,多进程执行
executors = {
    'default': ProcessPoolExecutor(3)
}

scheduler = BlockingScheduler(executors=executors)

# 添加一个定时运行文章画像更新的任务, 每隔1个小时运行一次
scheduler.add_job(update_article_profile, trigger='interval', hours=1)
# 添加一个定时运行用户画像更新的任务, 每隔2个小时运行一次
scheduler.add_job(update_user_profile, trigger='interval', hours=2)
# 添加一个定时运行特征中心平台的任务,每隔4小时更新一次
scheduler.add_job(update_ctr_feature, trigger='interval', hours=4)

scheduler.start()

参考

https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(学习资源已保存至网盘, 提取码:eakp)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容