usercf算法及其描述 2 3描述

查看: 16217|回复: 2
注册时间最后登录在线时间51 小时阅读权限50积分1393帖子主题精华0UID7131
高级会员, 积分 1393, 距离下一级还需 607 积分
6注册时间最后登录在线时间51 小时阅读权限50积分1393帖子主题精华0UID7131
本帖最后由 沉默冰峰 于
22:11 编辑
基于ItemCF算法#!/usr/sbin/env python
# -*- coding:utf-8 -*-
import math
# ItemCF算法
def ItemSimilarity(train):
& & C = dict()
& & N = dict()
& & for u,items in train.items():
& && &&&for i in items.keys():
& && && && &N[i] += 1
& && && && &for j in items.keys():
& && && && && & if i == j:
& && && && && && &&&continue
& && && && && & C[i][j] += 1
& & W = dict()
& & for i,related_items in C.items():
& && &&&for j,cij in related_items.items():
& && && && &W[i][j] = cij / math.sqrt( N[i] * N[j])
& & return W
# ItemCF-IUF算法
def ItemSimilarity_v2(train):
& & C = dict()
& & N = dict()
& & for u,items in train.items():
& && &&&for i in items.keys():
& && && && &N[i] += 1
& && && && &for j in items.keys():
& && && && && & if i == j:
& && && && && && &&&continue
& && && && && & C[i][j] += 1 / math.log(1+len(items)*1.0)
& & W = dict()
& & for i,related_items in C.items():
& && &&&for j,cij in related_items.items():
& && && && &W[i][j] = cij / math.sqrt( N[i] * N[j])
& & return W
def Recommend(train,user_id,W,K):
& & rank = dict()
& & ru = train[user_id]
& & for i,pi in ru.items():
& && &&&for j,wj in sorted(W[i].items,key=itemgetter(1),reverse=True)[0:K]:
& && && && &if j in ru:
& && && && && & continue
& && && && &rank[j] += pi*wj
& & return rank复制代码基于UserCF算法#!/usr/sbin/env python
# -*- coding:utf-8 -*-
import math
'''
基于UserCF的推荐算法
'''
# UserCF算法
def UserSimilarity(train):
& & item_users = dict()
& & for u,items in train.items():
& && &&&for i in items.keys():
& && && && &if i not in item_users:
& && && && && & item_users[i] = set()
& && && && &item_users[i].add(u)
& & C = dict()
& & N = dict()
& & for i,users in item_users.items():
& && &&&for u in users:
& && && && &N[u] += 1
& && && && &for v in users:
& && && && && & if u == v:
& && && && && && &&&continue
& && && && && & C[u][v] += 1
& & W = dict()
& & for u,related_users in C.items():
& && &&&for v,cuv in related_users.items():
& && && && &W[u][v] = cuv / math.sqrt(N[u] * N[v])
& & return W
# User-IIF算法
def UserSimilarity_v2(train):
& & item_users = dict()
& & for u,items in train.items():
& && &&&for i in items.keys():
& && && && &if i not in item_users:
& && && && && & item_users[i] = set()
& && && && &item_users[i].add(u)
& & C = dict()
& & N = dict()
& & for i,users in item_users.items():
& && &&&for u in users:
& && && && &N[u] += 1
& && && && &for v in users:
& && && && && & if u == v:
& && && && && && &&&continue
& && && && && & C[u][v] += 1 / math.log(1+len(users))
& & W = dict()
& & for u,related_users in C.items():
& && &&&for v,cuv in related_users.items():
& && && && &W[u][v] = cuv / math.sqrt(N[u] * N[v])
& & return W
def Recommend(user,train,W):
& & rank = dict()
& & interacted_items = train[user]
& & for v,wuv in sorted(W[u].items,key=itemgetter(1),reverse=True)[0:K]:
& && &&&for i,rvi in train[v].items:
& && && && &if i in interacted_items:
& && && && && & continue
& && && && &rank[i] += wuv*rvi
& & return rank复制代码基于时间上下文的个性化推荐#!/usr/sbin/env python
# -*- coding:utf-8 -*-
import math
def RecentPopularity(records,alpha,T):
& & ret = dict()
& & for user,item,tm in records:
& && &&&if tm &= T:
& && && && &continue
& && &&&addToDict(ret,item,1/(1.0+alpha*(T-tm)))
& & return ret
def addToDict(dicts,item,value):
& & pass
def ItemSimilarity(train,alpha):
& & C = dict()
& & N = dict()
& & for u,items in train.items():
& && &&&for i,tui in items.items():
& && && && &N[i] += 1
& && && && &for j,tuj in items.items():
& && && && && & if i == j:
& && && && && && &&&continue
& && && && && & C[i][j] += 1 / (1+alpha*abs(tui-tuj))
& & W = dict()
& & for i,related_items in C.items():
& && &&&for j,cij in related_items.items():
& && && && &W[i][j] = cij / math.sqrt(N[i] * N[j])
& & return W
def RecommendItemCF(train,user_id,W,K,t0):
& & rank = dict()
& & ru = train[user_id]
& & for i,pi in ru.items():
& && &&&for j,wj in sorted(W[i].items(),\
& && && && && & key=itemgetter(1),reverse=True)[0:K]:
& && && && &if j,tuj in ru.items():
& && && && && & continue
& && && && &rank[j] += pi * wj / (1 + alpha * (t0 - tuj))
& & return rank
def UserSimilarity(train):
& & item_users = dict()
& & for u,items in train.items():
& && &&&for i,tui in items.items():
& && && && &if i not in item_users:
& && && && && & item_users[i] = dict()
& && && && &item_users[i][u] = tui
& & C = dict()
& & N = dict()
& & for i,users in item_users.items():
& && &&&for u,tui in users.items():
& && && && &N[u] += 1
& && && && &for v,tvi in users.items():
& && && && && & if u == v:
& && && && && && &&&continue
& && && && && & C[u][v] += 1 / (1 + alpha * abs(tui - tvi))
& & W = dict()
& & for u,related_users in C.items():
& && &&&for v,cuv in related_users.items():
& && && && &W[u][v] = cuv / math.sqrt(N[u] * N[v])
& & return W
def RecommendUserCF(user,T,train,W):
& & rank = dict()
& & interacted_items = train[user]
& & for v,wuv in sorted(W[u].items,key=itemgetter(1),\
& && && && &reverse=True)[0:K]:
& && &&&for i,tvi in train[v].items:
& && && && &if i in interacted_items:
& && && && && & continue
& && && && &rank[i] += wuv / (1 + alpha * (T - tvi))
& & return rank复制代码基于LFM算法的个性化推荐#!/usr/bin/env python
import random
'''
items =& {'12':'PHP','1203':'Storm','123':'Ubuntu'}
items_pool =& [12,32,121,324,532,123,53,2]
user_items =& {'1010':[12,]}
'''
def RandomSelectNagativeSample(items):
& & ret = dict()
& & for i in items.keys():
& && &&&ret[i] = 1
& & n = 0
& & for i in range(0,len(items)*3):
& && &&&item = items_pool[random.randint(0,len(items_pool)-1)]
& && &&&if item in ret:
& && && && &continue
& && &&&ret[item] = 0
& && &&&n += 1
& && &&&if n & len(items):
& && && && &break
& & return ret
def InitModel(user_items,F):
& & P = dict()
& & Q = dict()
& & for u in user_items.keys():
& && &&&if u not in P:
& && && && &P[u] = {}
& && &&&for f in range(0,F):
& && && && &P[u][f] = 1
& & items = user_items.values()
& & itemLen = len(items[0])
& & i = 0
& & while i& itemLen:
& && &&&ii = items[0][i]
& && &&&if ii not in Q:
& && && && &Q[ii] = {}
& && &&&for f in range(0,F):
& && && && &Q[ii][f] = 1
& && &&&i += 1
& & return [P,Q]
def LatentFactorModel(user_items,F,N,alpha,lambda1):
& & [P,Q] = InitModel(user_items,F)
& & for setup in range(0,N):
& && &&&for user,items in user_items.items():
& && && && &samples = RandomSelectNagativeSample(items)
& && && && &for item,rui in samples.items():
& && && && && & eui = rui - Predict(user,item)
& && && && && & for f in range(0,F):
& && && && && && &&&P[user][f] += alpha * (eui * Q[item][f] - lambda1 * P[user][f])
& && && && && && &&&Q[item][f] += alpha * (eui * P[user][f] - lambda1 * Q[item][f])
& && &&&alpha *= 0.9
& & return [P,Q]
def Recommend(user,P,Q):
& & rank = dict()
& & for f,puf in P[user].items():
& && &&&for i,pfi in Q[f].items():
& && && && &if i not in rank:
& && && && && & rank[i] += puf * qfi
& & return rank
def PersonalRank(G,alpha,root,maxsetup):
& & rank = dict()
& & #rank = {x:0 for x in G.keys()}
& & rank = rank.fromkeys(G.keys(),0)
& & rank[root] = 1
& & for k in range(maxsetup):
& && &&&tmp = dict()
& && &&&#tmp = {x:0 for x in G.keys()}
& && &&&tmp = tmp.fromkeys(G.keys(),0)
& && &&&for i,ri in G.items():
& && && && &for j,wij in ri.items():
& && && && && & if j not in tmp:
& && && && && && &&&tmp[j] = 0
& && && && && & tmp[j] += alpha * rank[i]/(1.0*len(ri))
& && && && && & if j == root:
& && && && && && &&&tmp[j] += 1 - alpha
& && &&&rank = tmp
& && &&&print 'iter:' + str(k) + &\t&,
& && &&&for key,value in rank.items():
& && && && &print &%s:%.3f,\t& % (key,value),
& && &&&print
& & return rank
if __name__ == '__main__':
& & G = {'A':{'a':1,'c':1},
& &&&'B':{'a':1,'b':1,'c':1,'d':1},
& &&&'C':{'c':1,'d':1},
& &&&'a':{'A':1,'B':1},
& &&&'b':{'B':1},
& &&&'c':{'A':1,'B':1,'C':1},
& &&&'d':{'B':1,'C':1}}
& & PersonalRank(G,0.85,'A',20)
#items_pool = {'12':'PHP','32':'Nginx','121':'Apache','324':'Erlang','532':'Linux','123':'Ubuntu','53':'Java','1203':'Storm','429':'Kafka','2932':'Flume'}
items_pool = [12,32,121,324,532,123,53,2]
items = {'12':'PHP','1203':'Storm','123':'Ubuntu'}
user_items = {'1010':[12,]}
#print RandomSelectNagativeSample(items)
print InitModel(user_items,4)
'''复制代码基于图的推荐算法#!/usr/sbin/env python
# -*- coding:utf-8 -*-
'''
& & 基于图的推荐算法,二分图
'''
def PersonalRank(G,alpha,root,maxsetup):
& & rank = dict()
& & #rank = {x:0 for x in G.keys()}
& & rank = rank.fromkeys(G.keys(),0)
& & rank[root] = 1
& & for k in range(maxsetup):
& && &&&tmp = dict()
& && &&&#tmp = {x:0 for x in G.keys()}
& && &&&tmp = tmp.fromkeys(G.keys(),0)
& && &&&for i,ri in G.items():
& && && && &for j,wij in ri.items():
& && && && && & if j not in tmp:
& && && && && && &&&tmp[j] = 0
& && && && && & tmp[j] += alpha * rank[i]/(1.0*len(ri))
& && && && && & if j == root:
& && && && && && &&&tmp[j] += 1 - alpha
& && &&&rank = tmp
& && &&&print 'iter:' + str(k) + &\t&,
& && &&&for key,value in rank.items():
& && && && &print &%s:%.3f,\t& % (key,value),
& && &&&print
& & return rank
if __name__ == '__main__':
& & G = {'A':{'a':1,'c':1},
& &&&'B':{'a':1,'b':1,'c':1,'d':1},
& &&&'C':{'c':1,'d':1},
& &&&'a':{'A':1,'B':1},
& &&&'b':{'B':1},
& &&&'c':{'A':1,'B':1,'C':1},
& &&&'d':{'B':1,'C':1}}
& & PersonalRank(G,0.85,'C',20)复制代码基于标签的推荐算法#!/usr/sbin/env python
# -*- coding:utf-8 -*-
import math
#标签流行度算法
def TagPopularity(records):
& & tagfreq = dict()
& & for user,item,tag in records:
& && &&&if tag not in tagfreq:
& && && && &tagfreq[tag] = 1
& && &&&else:
& && && && &tagfreq[tag] += 1
& & return tagfreq
#物品相似度余弦算法
def CosineSim(item_tags,i,j):
& & ret&&= 0
& & for b,wib in item_tags[i].items():
& && &&&if b in item_tags[j]:
& && && && &ret += wib * item_tags[j][b]
& & ni = 0
& & nj = 0
& & for b,w in item_tags[i].items():
& && &&&ni += w * w
& & for b,w in item_tags[j].items():
& && &&&nj += w * w
& & if ret == 0:
& && &&&return 0
& & return ret / math.sqrt(ni * nj)
#推荐物品的多样性算法
def Diversity(item_tags,recommend_items):
& & ret = 0
& & n = 0
& & for i in recommend_items.keys():
& && &&&for j in recommend_items.keys():
& && && && &if i == j:
& && && && && & continue
& && && && &ret += CosineSim(item_tags,i,j)
& && && && &n += 1
& & return ret / (n * 1.0)
def addValueToMat(dicts,index,k,v):
& & if index not in dicts:
& && &&&dicts[index] = dict()
& && &&&dicts[index][k] = v
& & else:
& && &&&if k not in dicts[index]:
& && && && &dicts[index][k] = v
& && &&&else:
& && && && &dicts[index][k] += v
def InitStat(records):
& & user_tags = dict() #存储 user_tags[u][b] = n(u,b)
& & tag_items = dict() # tag_items[b][i] = n(b,i)
& & user_items = dict()
& & for user,item,tag in records.items():
& && &&&addValueToMat(user_tags,user,tag,1)
& && &&&addValueToMat(tag_items,tag,item,1)
& && &&&addValueToMat(user_items,user,item,1)
def Recommend(user):
& & recommend_items = dict()
& & tagged_items = user_items[user]
& & for tag,wut in user_tags[user].items():
& && &&&# wut = wut*1.0/math.log(1+len(tag_users[tag])) #TagBasedTFIDF and TagBasedTFIDF++
& && &&&for item,wti in tag_items[tag].items():
& && && && &# wti = wti*1.0/math.log(1+len(user_items[user])) #TagBasedTFIDF++
& && && && &if item in tagged_items:
& && && && && & continue
& && && && &if item not in recommend_items:
& && && && && & recommend_items[item] = wut * wti
& && && && &else:
& && && && && & recommend_items[item] += wut * wti
& & return recommend_items
if __name__ == &main&:
& & user_tags = dict()
& & user_items = dict()
& & tag_items = dict()
& & records = dict()
& & user = '1220';
& & InitStat(records)
& & rec_items = Recommend(user)复制代码
注册时间最后登录在线时间35 小时阅读权限90积分14549帖子主题精华0UID11516
注册时间最后登录在线时间35 小时阅读权限90积分14549帖子主题精华0UID11516
不错& &赞一个
注册时间最后登录在线时间115 小时阅读权限90积分39329帖子主题精华0UID12166
注册时间最后登录在线时间115 小时阅读权限90积分39329帖子主题精华0UID12166
过来顶一下
IT运维专家网感谢您的支持
合作联系: QQ:/MSN:/mail:netseek@linuxtone.org </s
Powered by推荐系统(11)
前面计算用户间兴趣相&#20284;度使用的是余弦相&#20284;度,该公式过于粗糙,需要改进该公式。
& & &&以图书为例,如果两个用户都曾经买过《新华字典》,这丝毫不能说明他们兴趣相&#20284;,因为绝大多数中国人小时候都买过《新华字典》。但如果两个用户都买过《数据挖掘导论》,那可以认为他们的兴趣比较相&#20284;,因为只有研究数据挖掘的人才会买这本书。换句话说,两个用户对冷门物品采取过同样的行为更能说明他们兴趣的相&#20284;度。因此,John S. Breese在论文中提出了如下公式,根据用户行为计算用户的兴趣相&#20284;度:
& & & 其中,N(i)表示用户u,v共同感兴趣的物品的个数。该公式通过惩罚了用户u,v共同兴趣商品中热门商品对他们相&#20284;度的影响。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:3041次
排名:千里之外
转载:14篇
(1)(3)(4)(15)mapreduce实现推荐系统(UserCF-基于用户的协同过滤算法)
用户推荐协同过滤算法(UserCF)原理说明
基于用户的协同过滤,通过不同用户对物品的评分来评测用户之间的相似性,基于用户之间的相似性做出推荐。简单来讲就是:给用户推荐和他兴趣相似的其他用户喜欢的物品。
1.原始数据输入
2.构成矩阵
101 102 103 104 105 106 107
[1,] 5.0 3.0 2.5 0.0 0.0 0 0
[2,] 2.0 2.5 5.0 2.0 0.0 0 0
[3,] 2.5 0.0 0.0 4.0 4.5 0 5
[4,] 5.0 0.0 3.0 4.5 0.0 4 0
[5,] 4.0 3.0 2.0 4.0 3.5 4 0
3.欧氏相似矩阵转换
[,1] [,2] [,3] [,4] [,5]
[1,] 0.....0000000
[2,] 0.....7761999
[3,] 0.....0000000
[4,] 1.....0000000
[5,] 1.....0000000
计算方式:
相似度=n/(1+sqrt(sum((Xi-Yi)^2)))
即需要对两个向量元素做差值并平方再求和再开方,开方后加1,最后n是有效向量差值个数。
如用户1和用户2的相似度计算:
(5.0-2.0)^2 + (3.0-2.5)^2 + (2.5-5.0)^2 = 15.5 //之所以只有3个向量元素做差值 是因为要两个向量元素都为非0值才做差值计算
3/(1+sqrt(15.5)) = 0.607656
如用户1和用户4的相似度计算:
(5.0-5.0)^2 + (2.5-3.0)^2 = 0.25
2/(1+sqrt(0.25)) = 1.333333 因大于1 取相似度为1.000000(程序里去掉了这个限制)
4.最近邻矩阵
根据欧氏相似矩阵找出用户相似度最高的前2个用户,如下所示:
如用户1相似度排序:4[1.0],5[1.0],2[0.607],3[0.285],1[0.0]
5.以用户1为例的推荐矩阵
用户1前2个最高相似度是用户4和用户5,分别列出对应评分矩阵:
101 102 103 104 105 106 107
1 5.0 3.0 2.5 0.0 0.0 0.0 0.0
4 5.0 0.0 3.0 4.5 0.0 4 0
5 4.0 3.0 2.0 4.0 3.5 4 0
去掉用户1已经买过的物品,即101,102,103,剩下用户1未买过的物品进行推荐,推荐矩阵如下:
101 102 103 104 105 106 107
4 0 0 0 4.5 0.0 4 0
5 0 0 0 4.0 3.5 4 0
6.以用户1为例的推荐结果
用户1未购买的物品分别得分:
104[(4.5+4)/2=4.25],106[(4+4)/2=4],105[(0+3.5)/2=1.75],107[(0+0)/2=0]
最后推荐前2个物品,矩阵如下:
推荐物品 物品得分
[1] &104& &4.25&
[2] &106& &4&
7.代码实现
主要基于hadoop实现mapreduce并行算法,UserCF算法在网上并行实现的不过,这里作为练习实现下,主要分为5步实现:
步骤1: 将数据输入整理,为计算欧氏相似矩阵准备数据。
步骤2: 依赖步骤1输出数据,计算欧氏相似矩阵完成。
步骤3:依赖步骤2输出数据,根据欧氏相似矩阵找出用户相似度最高的前2个用户。
步骤4:依赖步骤3输出数据和原始数据,计算出每个用户与相似度最高的2个用户之间未买过的物品进行推荐,输出推荐矩阵,输出数据是每个用户对应的推荐物品的平均值。
步骤5:依赖步骤4输出数据,根据每个用户对应的推荐物品的平均值,计算出前3个推荐物品。
主要源文件:
1)HdfsDAO.java 是一个HDFS操作的工具,用API实现Hadoop的各种HDFS命令,请参考文章:
2)UserCFHadoop.java 是main入口文件,实现目录配置,步骤运行。
3)UserCF_Step1.java 是步骤1实现文件
4)UserCF_Step2.java 是步骤2实现文件
5)UserCF_Step3.java 是步骤3实现文件
6)UserCF_Step4.java 是步骤4实现文件
7)UserCF_Step5.java 是步骤5实现文件
运行环境:
1)Centos6.5
2)hadoop 2.7.2
3)java sdk 1.8.0
主要代码如下:
1)HdfsDAO.java
package recommend.code1.
import java.io.IOE
import java.net.URI;
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FSDataOutputS
import org.apache.hadoop.fs.FileS
import org.apache.hadoop.fs.FileS
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IOU
import org.apache.hadoop.mapred.JobC
public class HdfsDAO {
private static final String HDFS = &hdfs://localhost:9000/&;
public HdfsDAO(Configuration conf) {
this(HDFS, conf);
public HdfsDAO(String hdfs, Configuration conf) {
this.hdfsPath =
this.conf =
private String hdfsP
public static void main(String[] args) throws IOException {
JobConf conf = config();
HdfsDAO hdfs = new HdfsDAO(conf);
hdfs.mkdirs(&/tmp/new&);
hdfs.copyFile(&/home/yj/HadoopFile/userFile/small.csv&, &/tmp/new&);
hdfs.ls(&/tmp/new&);
public static JobConf config(){
JobConf conf = new JobConf(HdfsDAO.class);
conf.setJobName(&HdfsDAO&);
conf.addResource(&classpath:/hadoop/core-site.xml&);
conf.addResource(&classpath:/hadoop/hdfs-site.xml&);
conf.addResource(&classpath:/hadoop/mapred-site.xml&);
public void mkdirs(String folder) throws IOException {
Path path = new Path(folder);
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
if (!fs.exists(path)) {
fs.mkdirs(path);
System.out.println(&Create: & + folder);
fs.close();
public void rmr(String folder) throws IOException {
Path path = new Path(folder);
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
fs.deleteOnExit(path);
System.out.println(&Delete: & + folder);
fs.close();
public void ls(String folder) throws IOException {
Path path = new Path(folder);
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
FileStatus[] list = fs.listStatus(path);
System.out.println(&ls: & + folder);
System.out.println(&==========================================================&);
for (FileStatus f : list) {
System.out.printf(&name: %s, folder: %s, size: %d\n&, f.getPath(), f.isDir(), f.getLen());
System.out.println(&==========================================================&);
fs.close();
public void createFile(String file, String content) throws IOException {
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
byte[] buff = content.getBytes();
FSDataOutputStream os =
os = fs.create(new Path(file));
os.write(buff, 0, buff.length);
System.out.println(&Create: & + file);
} finally {
if (os != null)
os.close();
fs.close();
public void copyFile(String local, String remote) throws IOException {
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
fs.copyFromLocalFile(new Path(local), new Path(remote));
System.out.println(&copy from: & + local + & to & + remote);
fs.close();
public void download(String remote, String local) throws IOException {
Path path = new Path(remote);
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
fs.copyToLocalFile(path, new Path(local));
System.out.println(&download: from& + remote + & to & + local);
fs.close();
public void cat(String remoteFile) throws IOException {
Path path = new Path(remoteFile);
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
FSDataInputStream fsdis =
System.out.println(&cat: & + remoteFile);
fsdis =fs.open(path);
IOUtils.copyBytes(fsdis, System.out, 4096, false);
} finally {
IOUtils.closeStream(fsdis);
fs.close();
public void location() throws IOException {
// String folder = hdfsPath + &create/&;
// String file = &t2.txt&;
// FileSystem fs = FileSystem.get(URI.create(hdfsPath), new
// Configuration());
// FileStatus f = fs.getFileStatus(new Path(folder + file));
// BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen());
// System.out.println(&File Location: & + folder + file);
// for (BlockLocation bl : list) {
// String[] hosts = bl.getHosts();
// for (String host : hosts) {
// System.out.println(&host:& + host);
// fs.close();
2)UserCFHadoop.java
package recommend.code1.
import java.util.HashM
import java.util.M
import java.util.regex.P
import org.apache.hadoop.conf.C
public class UserCFHadoop {
public static final String HDFS = &hdfs://localhost:9000&;
public static final Pattern DELIMITER = pile(&[\t,]&);
* @param args
public static void main(String[] args) {
// TODO Auto-generated method stub
Map path = new HashMap();
path.put(&data&, &/home/yj/HadoopFile/userFile/item.csv&);// 本地的数据文件
path.put(&input_file&, HDFS + &/user/yj/input/userCF/&);// HDFS的目录
path.put(&input_step1&,
path.get(&input_file&) + &/data&);
path.put(&output_step1&, path.get(&input_file&) + &/step1&);
path.put(&input_step2&,
path.get(&output_step1&));
path.put(&output_step2&, path.get(&input_file&) + &/step2&);
path.put(&input_step3&,
path.get(&output_step2&));
path.put(&output_step3&, path.get(&input_file&) + &/step3&);
path.put(&input1_step4&, path.get(&output_step3&));
path.put(&input2_step4&, path.get(&input_step1&));
path.put(&output_step4&, path.get(&input_file&) + &/step4&);
path.put(&input_step5&,
path.get(&output_step4&));
path.put(&output_step5&, path.get(&input_file&) + &/step5&);
UserCF_Step1.run(path);
UserCF_Step2.run(path);
UserCF_Step3.run(path);
UserCF_Step4.run(path);
UserCF_Step5.run(path);
catch (Exception e)
e.printStackTrace();
System.exit(0);
public static Configuration config() {// Hadoop集群的远程配置信息
Configuration conf = new Configuration();
3)UserCF_Step1.java
package recommend.code1.
import hadoop.myMapreduce.martrix.MainR
import java.io.IOE
import java.util.HashM
import java.util.I
import java.util.M
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.Reducer.C
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.FileS
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import recommend.code1.hdfs.HdfsDAO;
public class UserCF_Step1 {
public static class MyMapper extends Mapper {
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = UserCFHadoop.DELIMITER.split(values.toString());
if (tokens.length &= 3)
Text k = new Text(tokens[1]);//itemid
Text v = new Text(tokens[0] + &,& + tokens[2]);//userid + score
context.write(k, v);
public static class MyReducer extends Reducer {
public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
Map map = new HashMap();
for (Text line : values) {
String val = line.toString();
String[] vlist = UserCFHadoop.DELIMITER.split(val);
if (vlist.length &= 2)
map.put(vlist[0], vlist[1]);
Iterator iterA = map.keySet().iterator();
while (iterA.hasNext())
String k1 = iterA.next();
String v1 = map.get(k1);
Iterator iterB = map.keySet().iterator();
while (iterB.hasNext())
String k2 = iterB.next();
String v2 = map.get(k2);
context.write(new Text(k1 + &,& + k2), new Text(v1 + &,& + v2));
public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = UserCFHadoop.config();
String input
= path.get(&input_step1&);
String output = path.get(&output_step1&);
HdfsDAO hdfs = new HdfsDAO(UserCFHadoop.HDFS, conf);
hdfs.rmr(path.get(&input_file&));
hdfs.rmr(input);
hdfs.mkdirs(input);
hdfs.copyFile(path.get(&data&), input);
Job job = Job.getInstance(conf, &UserCF_Step1 job&);
job.setJarByClass(UserCF_Step1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(input));// 加载2个输入数据集
FileOutputFormat.setOutputPath(job, new Path(output));
System.out.println(&input : & + input);
System.out.println(&output: & + output);
if (!job.waitForCompletion(true))
System.out.println(&main run stop!&);
System.out.println(&main run successfully!&);
4)UserCF_Step2.java
package recommend.code1.
import hadoop.myMapreduce.martrix.MainR
import java.io.IOE
import java.util.HashM
import java.util.I
import java.util.M
import java.lang.M
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.Reducer.C
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.FileS
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import recommend.code1.hdfs.HdfsDAO;
public class UserCF_Step2 {
public static class MyMapper extends Mapper {
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = UserCFHadoop.DELIMITER.split(values.toString());
if (tokens.length &= 4)
Text k = new Text(tokens[0] + &,& + tokens[1]);
Text v = new Text(tokens[2] + &,& + tokens[3]);
context.write(k, v);
public static class MyReducer extends Reducer {
public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
double sum = 0.0;
double similarity = 0.0;
int num = 0;
for (Text line : values) {
String val = line.toString();
String[] vlist = UserCFHadoop.DELIMITER.split(val);
if (vlist.length &= 2)
sum += Math.pow((Double.parseDouble(vlist[0]) - Double.parseDouble(vlist[1])), 2);
if (sum & 0.)
similarity = (double)num / (1 + Math.sqrt(sum));
if (similarity & 1.0)
similarity = 1.0;
context.write(key, new Text(String.format(&%.7f&, similarity)));
public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = UserCFHadoop.config();
String input
= path.get(&input_step2&);
String output = path.get(&output_step2&);
Job job = Job.getInstance(conf, &UserCF_Step2 job&);
job.setJarByClass(UserCF_Step2.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(input));// 加载2个输入数据集
FileOutputFormat.setOutputPath(job, new Path(output));
System.out.println(&input : & + input);
System.out.println(&output: & + output);
if (!job.waitForCompletion(true))
System.out.println(&main run stop!&);
System.out.println(&main run successfully!&);
5)UserCF_Step3.java
package recommend.code1.
import hadoop.myMapreduce.martrix.MainR
import java.io.IOE
import java.util.ArrayL
import java.util.C
import java.util.HashM
import java.util.I
import java.util.L
import java.util.M
import java.lang.M
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.Reducer.C
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.FileS
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import recommend.code1.hdfs.HdfsDAO;
public class UserCF_Step3 {
public static class MyMapper extends Mapper {
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = UserCFHadoop.DELIMITER.split(values.toString());
if (tokens.length &= 3)
Text k = new Text(tokens[0]);
Text v = new Text(tokens[1] + &,& + tokens[2]);
context.write(k, v);
public static class MyReducer extends Reducer {
private final int NEIGHBORHOOD_NUM = 2;
public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
Map map = new HashMap();
for (Text line : values) {
String val = line.toString();
String[] vlist = UserCFHadoop.DELIMITER.split(val);
if (vlist.length &= 2)
map.put(Double.parseDouble(vlist[1]), vlist[0]);
List list = new ArrayList();
Iterator iter = map.keySet().iterator();
while (iter.hasNext()) {
Double similarity = iter.next();
list.add(similarity);
//然后通过比较器来实现排序
Collections.sort(list,new Comparator() {
//降序排序
public int compare(Double o1, Double o2) {
pareTo(o1);
for (int i = 0; i & NEIGHBORHOOD_NUM && i & list.size(); i++)
context.write(key, new Text(map.get(list.get(i)) + &,& + String.format(&%.7f&, list.get(i))));
String v = &&;
for (int i = 0; i & NEIGHBORHOOD_NUM && i & list.size(); i++)
v += &,& + map.get(list.get(i)) + &,& + String.format(&%.7f&, list.get(i));
context.write(key, new Text(v.substring(1)));
public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = UserCFHadoop.config();
String input
= path.get(&input_step3&);
String output = path.get(&output_step3&);
Job job = Job.getInstance(conf, &UserCF_Step3 job&);
job.setJarByClass(UserCF_Step3.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(input));// 加载2个输入数据集
FileOutputFormat.setOutputPath(job, new Path(output));
System.out.println(&input : & + input);
System.out.println(&output: & + output);
if (!job.waitForCompletion(true))
System.out.println(&main run stop!&);
System.out.println(&main run successfully!&);
6)UserCF_Step4.java
package recommend.code1.
import hadoop.myMapreduce.martrix.MainR
import java.io.IOE
import java.util.ArrayL
import java.util.C
import java.util.HashM
import java.util.I
import java.util.L
import java.util.M
import java.lang.M
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.Reducer.C
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.FileS
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import recommend.code1.hdfs.HdfsDAO;
public class UserCF_Step4 {
public static class MyMapper extends Mapper {
private S// A:step3 or B:data
private int itemNum = 7;
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
flag = split.getPath().getParent().getName();// 判断读的数据集
System.out.println(flag);
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = UserCFHadoop.DELIMITER.split(values.toString());
int itemIndex = 100;
if (flag.equals(&step3&)) {
for (int i = 1; i &= itemN i++)
Text k = new Text(Integer.toString(itemIndex + i));//itemid
Text v = new Text(&A:& + tokens[0] + &,& + tokens[1] + &,& + tokens[3]);
context.write(k, v);
System.out.println(k.toString() + &
& + v.toString());
} else if (flag.equals(&data&)) {
Text k = new Text(tokens[1]);//itemid
Text v = new Text(&B:& + tokens[0] + &,& + tokens[2]);//userid + score
context.write(k, v);
System.out.println(k.toString() + &
& + v.toString());
public static class MyReducer extends Reducer {
public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
Map mapA = new HashMap();
Map mapB = new HashMap();
for (Text line : values) {
String val = line.toString();
if (val.startsWith(&A:&)) {
String[] kv = MainRun.DELIMITER.split(val.substring(2));
mapA.put(kv[0], kv[1] + &,& + kv[2]);
} else if (val.startsWith(&B:&)) {
String[] kv = MainRun.DELIMITER.split(val.substring(2));
mapB.put(kv[0], kv[1]);
Iterator iterA = mapA.keySet().iterator();
while (iterA.hasNext())
String userId = iterA.next();
if (!mapB.containsKey(userId))//不存在可以推荐 有买过这个物品的不推荐
String simiStr = mapA.get(userId);
String[] simi = MainRun.DELIMITER.split(simiStr);
if (simi.length &= 2)
double simiVal1 = mapB.containsKey(simi[0]) ? Double.parseDouble(mapB.get(simi[0])) : 0;
double simiVal2 = mapB.containsKey(simi[1]) ? Double.parseDouble(mapB.get(simi[1])) : 0;
double score = (simiVal1 + simiVal2) / 2;
context.write(new Text(userId), new Text(key.toString() + &,& + String.format(&%.2f&, score)));
public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = UserCFHadoop.config();
String input1 = path.get(&input1_step4&);
String input2 = path.get(&input2_step4&);
String output = path.get(&output_step4&);
Job job = Job.getInstance(conf, &UserCF_Step4 job&);
job.setJarByClass(UserCF_Step4.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加载2个输入数据集
FileOutputFormat.setOutputPath(job, new Path(output));
System.out.println(&input1: & + input1);
System.out.println(&input2: & + input2);
System.out.println(&output: & + output);
if (!job.waitForCompletion(true))
System.out.println(&main run stop!&);
System.out.println(&main run successfully!&);
7)UserCF_Step5.java
package recommend.code1.
import hadoop.myMapreduce.martrix.MainR
import java.io.IOE
import java.util.ArrayL
import java.util.C
import java.util.HashM
import java.util.I
import java.util.L
import java.util.M
import java.lang.M
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.Reducer.C
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.FileS
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import recommend.code1.hdfs.HdfsDAO;
public class UserCF_Step5 {
public static class MyMapper extends Mapper {
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = UserCFHadoop.DELIMITER.split(values.toString());
if (tokens.length &= 3)
Text k = new Text(tokens[0]);
Text v = new Text(tokens[1] + &,& + tokens[2]);
context.write(k, v);
public static class MyReducer extends Reducer {
private final int RECOMMENDER_NUM = 3;
public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
Map map = new HashMap();
for (Text line : values) {
String val = line.toString();
String[] vlist = UserCFHadoop.DELIMITER.split(val);
if (vlist.length &= 2)
map.put(Double.parseDouble(vlist[1]), vlist[0]);
List list = new ArrayList();
Iterator iter = map.keySet().iterator();
while (iter.hasNext()) {
Double similarity = iter.next();
list.add(similarity);
//然后通过比较器来实现排序
Collections.sort(list,new Comparator() {
//降序排序
public int compare(Double o1, Double o2) {
pareTo(o1);
String v = &&;
for (int i = 0; i & RECOMMENDER_NUM && i & list.size(); i++)
if (list.get(i).compareTo(new Double(0.001)) & 0)
v += &,& + map.get(list.get(i)) + &[& + String.format(&%.2f&, list.get(i)) + &]&;
if (!v.isEmpty())
context.write(key, new Text(v.substring(1)));
context.write(key, new Text(&none&));
public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = UserCFHadoop.config();
String input
= path.get(&input_step5&);
String output = path.get(&output_step5&);
Job job = Job.getInstance(conf, &UserCF_Step5 job&);
job.setJarByClass(UserCF_Step5.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(input));// 加载2个输入数据集
FileOutputFormat.setOutputPath(job, new Path(output));
System.out.println(&input : & + input);
System.out.println(&output: & + output);
if (!job.waitForCompletion(true))
System.out.println(&main run stop!&);
System.out.println(&main run successfully!&);
8.结果运行:
1)原数据item.csv
2)step1输出
3,3 2.5,2.5
3,2 2.5,2.0
3,1 2.5,5.0
3,5 2.5,4.0
3,4 2.5,5.0
2,3 2.0,2.5
2,2 2.0,2.0
2,1 2.0,5.0
2,5 2.0,4.0
2,4 2.0,5.0
1,3 5.0,2.5
1,2 5.0,2.0
1,1 5.0,5.0
1,5 5.0,4.0
1,4 5.0,5.0
5,3 4.0,2.5
5,2 4.0,2.0
5,1 4.0,5.0
5,5 4.0,4.0
5,4 4.0,5.0
4,3 5.0,2.5
4,2 5.0,2.0
4,1 5.0,5.0
4,5 5.0,4.0
4,4 5.0,5.0
2,2 2.5,2.5
2,1 2.5,3.0
2,5 2.5,3.0
1,2 3.0,2.5
1,1 3.0,3.0
1,5 3.0,3.0
5,2 3.0,2.5
5,1 3.0,3.0
5,5 3.0,3.0
2,2 5.0,5.0
2,1 5.0,2.5
2,5 5.0,2.0
2,4 5.0,3.0
1,2 2.5,5.0
1,1 2.5,2.5
1,5 2.5,2.0
1,4 2.5,3.0
5,2 2.0,5.0
5,1 2.0,2.5
5,5 2.0,2.0
5,4 2.0,3.0
4,2 3.0,5.0
4,1 3.0,2.5
4,5 3.0,2.0
4,4 3.0,3.0
3,3 4.0,4.0
3,2 4.0,2.0
3,5 4.0,4.0
3,4 4.0,4.5
2,3 2.0,4.0
2,2 2.0,2.0
2,5 2.0,4.0
2,4 2.0,4.5
5,3 4.0,4.0
5,2 4.0,2.0
5,5 4.0,4.0
5,4 4.0,4.5
4,3 4.5,4.0
4,2 4.5,2.0
4,5 4.5,4.0
4,4 4.5,4.5
3,3 4.5,4.5
3,5 4.5,3.5
5,3 3.5,4.5
5,5 3.5,3.5
5,5 4.0,4.0
5,4 4.0,4.0
4,5 4.0,4.0
4,4 4.0,4.0
3,3 5.0,5.0
3)step2输出
1,1 0.0000000
1,2 0.6076560
1,3 0.2857143
1,4 1.3333333
1,5 1.4164079
2,1 0.6076560
2,2 0.0000000
2,3 0.6532633
2,4 0.5568464
2,5 0.7761999
3,1 0.2857143
3,2 0.6532633
3,3 0.0000000
3,4 0.5634581
3,5 1.0703675
4,1 1.3333333
4,2 0.5568464
4,3 0.5634581
4,4 0.0000000
4,5 1.6000000
5,1 1.4164079
5,2 0.7761999
5,3 1.0703675
5,4 1.6000000
5,5 0.0000000
4)step3输出
1 5,1.,1.3333333
2 5,0.,0.6532633
3 5,1.,0.6532633
4 5,1.,1.3333333
5 4,1.,1.4164079
5)step4输出
3 102,2.75
4 102,3.00
3 103,3.50
1 104,4.25
2 105,4.00
1 105,1.75
4 105,1.75
3 106,2.00
2 106,2.00
1 106,4.00
2 107,2.50
1 107,0.00
5 107,0.00
4 107,0.00
6)step5输出
1 104[4.25],106[4.00],105[1.75]
2 105[4.00],107[2.50],106[2.00]
3 103[3.50],102[2.75],106[2.00]
4 102[3.00],105[1.75]
(window.slotbydup=window.slotbydup || []).push({
id: '2467140',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467141',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467142',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467143',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467148',
container: s,
size: '1000,90',
display: 'inlay-fix'

我要回帖

更多关于 usercf 的文章

 

随机推荐