用户画像往往是大型网站的重要模块,基于用户画像不仅可以实现个性化推荐,还可以实现用户分群、精准推送、精准营销以及用户行为预测、商业化转化分析等,为商业决策提供数据支持 。通常用户画像包括用户属性信息(性别、年龄、出生日期等)、用户行为信息(浏览、收藏、点赞等)以及环境信息(时间、地理位置等) 。
处理用户行为数据在数据准备阶段,我们通过 Flume 已经可以将用户行为数据收集到 Hive 的 user_action 表的 HDFS 路径中,先来看一下这些数据长什么样子,我们读取当天的用户行为数据,注意读取之前要先关联分区
_day = time.strftime("%Y-%m-%d", time.localtime())_localions = '/user/hive/warehouse/profile.db/user_action/' + _dayif fs.exists(_localions):# 如果有该文件直接关联,捕获关联重复异常try:self.spark.sql("alter table user_action add partition (dt='%s') location '%s'" % (_day, _localions))except Exception as e:passself.spark.sql("use profile")user_action = self.spark.sql("select actionTime, readTime, channelId, param.articleId, param.algorithmCombine, param.action, param.userId from user_action where dt>=" + _day)user_action 结果如下所示

文章插图
useraction
可以发现,上面的一条记录代表用户对文章的一次行为,但通常我们需要查询某个用户对某篇文章的所有行为,所以,我们要将这里用户对文章的多条行为数据合并为一条,其中包括用户对文章的所有行为 。我们需要新建一个 Hive 表 user_article_basic,这张表包括了用户 ID、文章 ID、是否曝光、是否点击、阅读时间等等,随后我们将处理好的用户行为数据存储到此表中
create table user_article_basic(user_idBIGINT comment "userID",action_time STRING comment "user actions time",article_idBIGINT comment "articleid",channel_idINT comment "channel_id",sharedBOOLEAN comment "is shared",clickedBOOLEAN comment "is clicked",collectedBOOLEAN comment "is collected",exposureBOOLEAN comment "is exposured",read_timeSTRING comment "reading time")COMMENT "user_article_basic"CLUSTERED by (user_id) into 2 bucketsSTORED as textfileLOCATION '/user/hive/warehouse/profile.db/user_article_basic';遍历每一条原始用户行为数据,判断用户对文章的行为,在 user_action_basic 中将该用户与该文章对应的行为设置为 Trueif user_action.collect():def _generate(row):_list = []if row.action == 'exposure':for article_id in eval(row.articleId):# ["user_id", "action_time","article_id", "channel_id", "shared", "clicked", "collected", "exposure", "read_time"]_list.Append([row.userId, row.actionTime, article_id, row.channelId, False, False, False, True, row.readTime])return _listelse:class Temp(object):shared = Falseclicked = Falsecollected = Falseread_time = ""_tp = Temp()if row.action == 'click':_tp.clicked = Trueelif row.action == 'share':_tp.shared = Trueelif row.action == 'collect':_tp.collected = Trueelif row.action == 'read':_tp.clicked = True_list.append([row.userId, row.actionTime, int(row.articleId), row.channelId, _tp.shared, _tp.clicked, _tp.collected,True, row.readTime])return _listuser_action_basic = user_action.rdd.flatMap(_generate)user_action_basic = user_action_basic.toDF(["user_id", "action_time", "article_id", "channel_id", "shared", "clicked", "collected", "exposure","read_time"])user_action_basic 结果如下所示,这里的一条记录包括了某个用户对某篇文章的所有行为
文章插图
user_action_basic
由于 Hive 目前还不支持 pyspark 的原子性操作,所以 user_article_basic 表的用户行为数据只能全量更新(实际场景中可以选择其他语言或数据库实现) 。这里,我们需要将当天的用户行为与 user_action_basic 的历史用户行为进行合并
old_data = https://www.isolves.com/it/wlyx/wzjs/2020-09-02/uup.spark.sql("select * from user_article_basic")new_data = old_data.unionAll(user_action_basic)合并后又会产生一个新的问题,那就是用户 ID 和文章 ID 可能重复,因为今天某个用户对某篇文章的记录可能在历史数据中也存在,而 unionAll() 方法并没有去重,这里我们可以按照用户 ID 和文章 ID 进行分组,利用 max() 方法得到 action_time, channel_id, shared, clicked, collected, exposure, read_time 即可,去重后直接存储到 user_article_basic 表中
推荐阅读
- 十大深圳美食推荐
- 护肤品|别只看推荐 看她们自己用什么护肤品 美妆博主自用8款护肤品
- 护肤品|抗衰老越早越好 别等老了后悔 5款全球公认的抗衰老护肤品推荐
- 黄菊散热白菊明目,为您推荐几款养肝明目茶
- 6款相见恨晚的手机APP分享,用过就舍不得卸载,强烈推荐给你们
- 老茶收藏家赵玉光,八角亭2015年品味布朗产品推荐
- 推荐一款好用的SSH工具,你值得拥有
- 方便携带的蓝牙耳机有哪些,方便小巧佩戴舒适的蓝牙耳机推荐
- 什么洗发水最好 十大洗发水品牌推荐
- 今日给大家推荐两款redis可视化工具
