- 浏览: 4578222 次
- 性别:
- 来自: 武汉
文章分类
最新评论
-
x70740692:
我也是舆情系统苦逼难做
网络舆情信息分析系统——(1) -
勇气魄力:
啥啊!没什么信息
ArcGIS for Server 10.1智能支持云的架构(上) -
迟来的风:
很不错,值得学习,非常感谢您给了我们这么好的资源
最新 跟我学spring3 电子书下载 -
linfanne:
哭了, 有一个地方写错了, 跟了2个多小时代码才找到原因& ...
Spring MVC+Freemarker+Javascript的多语言(国际化i18n/本地化)和主题(Theme)实现 -
linfanne:
无数的鲜花,多语言暂时不考虑,多主题刚好用到,我一般都不回帖, ...
Spring MVC+Freemarker+Javascript的多语言(国际化i18n/本地化)和主题(Theme)实现
Hadoop 实现协同过滤 (example in <Mahout in action> chapter 6) Part 1
最近一直在研究《Mahout in Action》,今天才算是把第一部分看完。在Chapter 6中有一个例子,是实现协同过滤进行推荐的例子,不过书上的是针对布尔值的输入数据,在mahout的安装目录里面也有这个算法的详细源码,但是毕竟是源码,读起来有点晦涩,所以就参考了书上的例子编写了(书上的例子思路比较清楚)不仅仅是布尔值的输入数据的代码;
下面就详细说下思路及代码:
输入数据:
第一列代表用户名ID,后面是项目ID,用逗号分隔
1,101,5.0 1,102,3.0 1,103,2.5 2,101,2.0 2,102,2.5 2,103,5.0 2,104,2.0 3,101,2.5 3,104,4.0 3,105,4.5 3,107,5.0 4,101,5.0 4,103,3.0 4,104,4.5 4,106,4.0 5,101,4.0 5,102,3.0 5,103,2.0 5,104,4.0 5,105,3.5 5,106,4.0
第一个MR 就是把输入数据的每个用户的信息整合下:
如下:
userid:1,vector:{103:2.5,102:3.0,101:5.0} userid:2,vector:{104:2.0,103:5.0,102:2.5,101:2.0} userid:3,vector:{107:5.0,105:4.5,104:4.0,101:2.5} userid:4,vector:{106:4.0,104:4.5,103:3.0,101:5.0} userid:5,vector:{106:4.0,105:3.5,104:4.0,103:2.0,102:3.0,101:4.0}
全局变量的文件:
WiKiUtils.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; public class WiKiUtils { public static final String PATH="hdfs://fansyonepc:9000/user/fansy/date1012/wikifirst/"; public static int RECOMMENDATIONSPERUSER=5; public static String JOB1OUTPATH=PATH+"job1/part-r-00000"; // this is used in WiKi5Reducer' function setup to get the items that the user already give a value }
WiKiDriver1.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.PATH; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.VectorWritable; public class WiKiDriver1 { /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf1 = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: WiKiDriver1 <in> <out>"); System.exit(2); } Job job1 = new Job(conf1, "wiki job one"); job1.setOutputFormatClass(SequenceFileOutputFormat.class); job1.setNumReduceTasks(1); job1.setJarByClass(WiKiDriver1.class); job1.setMapperClass(WikiMapper1.class); job1.setMapOutputKeyClass(VarLongWritable.class); job1.setMapOutputValueClass(LongAndFloat.class); job1.setReducerClass(WiKiReducer1.class); job1.setOutputKeyClass(VarLongWritable.class); job1.setOutputValueClass(VectorWritable.class); FileInputFormat.addInputPath(job1, new Path("hdfs://fansyonepc:9000/user/fansy/input/"+otherArgs[0])); SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1])); if(!job1.waitForCompletion(true)){ System.exit(1); // run error then exit } } }WiKiMapper1.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.IOException; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.math.VarLongWritable; public class WikiMapper1 extends Mapper<LongWritable ,Text,VarLongWritable,LongAndFloat>{ // private static final Pattern NUMBERS=Pattern.compile("(\\d+)"); public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ VarLongWritable userID=new VarLongWritable(); LongWritable itemID=new LongWritable(); FloatWritable itemValue=new FloatWritable(); String line=value.toString(); String[] info=line.split(","); if(info.length!=3){ return; } userID.set(Long.parseLong(info[0])); itemID.set(Long.parseLong(info[1])); itemValue.set(Float.parseFloat(info[2])); context.write(userID, new LongAndFloat(itemID,itemValue)); } }WiKiReducer1.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.IOException; import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.math.RandomAccessSparseVector; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; public class WiKiReducer1 extends Reducer<VarLongWritable,LongAndFloat,VarLongWritable,VectorWritable> { public void reduce(VarLongWritable userID,Iterable<LongAndFloat> itemPrefs,Context context) throws IOException, InterruptedException{ // RandomAccessSparseVector(int cardinality, int initialCapacity) Vector userVector=new RandomAccessSparseVector(Integer.MAX_VALUE,10); for(LongAndFloat itemPref:itemPrefs){ userVector.set(Integer.parseInt(itemPref.getFirst().toString()),Float.parseFloat(itemPref.getSecond().toString()) ); } context.write(userID, new VectorWritable(userVector)); // System.out.println("userid:"+userID+",vector:"+userVector); } }LongAndFloat.java: 用于存储数据并实现Writable的数据类型
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.WritableComparable; public class LongAndFloat implements WritableComparable<LongAndFloat> { private LongWritable first; private FloatWritable second; public LongAndFloat(){ set(new LongWritable(),new FloatWritable()); } public LongAndFloat(LongWritable l,FloatWritable f){ set(l,f); } public void set(LongWritable longWritable, FloatWritable intWritable) { // TODO Auto-generated method stub this.first=longWritable; this.second=intWritable; } public LongWritable getFirst(){ return first; } public FloatWritable getSecond(){ return second; } @Override public void readFields(DataInput arg0) throws IOException { // TODO Auto-generated method stub first.readFields(arg0); second.readFields(arg0); } @Override public void write(DataOutput arg0) throws IOException { // TODO Auto-generated method stub first.write(arg0); second.write(arg0); } @Override public int compareTo(LongAndFloat o) { // TODO Auto-generated method stub int cmp=first.compareTo(o.first); if(cmp!=0){ return cmp; } return second.compareTo(o.second); } }
第二个MR:
输入数据为MR(1) 的输出 只是项目的相似度 先不管用户ID,直接对后面的所有项目进行拆分。输出应该类似下面:
101,{107:1.0,106:2.0,105:2.0,104:4.0,103:4.0,102:3.0,101:5.0} 102,{106:1.0,105:1.0,104:2.0,103:3.0,102:3.0,101:3.0}WiKiDriver2.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.*; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.mahout.math.VectorWritable; public class WiKiDriver2 { /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf1 = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: WiKiDriver2 <in> <out>"); System.exit(2); } Job job1 = new Job(conf1, "wiki job two"); job1.setNumReduceTasks(1); job1.setJarByClass(WiKiDriver2.class); job1.setInputFormatClass(SequenceFileInputFormat.class); job1.setMapperClass(WikiMapper2.class); job1.setMapOutputKeyClass(IntWritable.class); job1.setMapOutputValueClass(IntWritable.class); job1.setReducerClass(WiKiReducer2.class); job1.setOutputKeyClass(IntWritable.class); job1.setOutputValueClass(VectorWritable.class); job1.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileInputFormat.addInputPath(job1, new Path(PATH+otherArgs[0])); SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1])); if(!job1.waitForCompletion(true)){ System.exit(1); // run error then exit } } }WiKiMapper2.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; public class WikiMapper2 extends Mapper<VarLongWritable ,VectorWritable,IntWritable,IntWritable>{ public void map(VarLongWritable userID,VectorWritable userVector,Context context) throws IOException, InterruptedException{ Iterator<Vector.Element> it=userVector.get().iterateNonZero(); while(it.hasNext()){ int index1=it.next().index(); // System.out.println("index1:"+index1); Iterator<Vector.Element> it2=userVector.get().iterateNonZero(); while(it2.hasNext()){ int index2=it2.next().index(); // test /*if(index1==101){ System.out.println("index1:"+index1+",index2:"+index2); }*/ context.write(new IntWritable(index1), new IntWritable(index2)); } } } }WiKiReducer2.java
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.math.RandomAccessSparseVector; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; public class WiKiReducer2 extends Reducer<IntWritable,IntWritable,IntWritable,VectorWritable> { public void reduce(IntWritable itemIndex1,Iterable<IntWritable> itemPrefs,Context context) throws IOException, InterruptedException{ // RandomAccessSparseVector(int cardinality, int initialCapacity) Vector itemVector=new RandomAccessSparseVector(Integer.MAX_VALUE,10); for(IntWritable itemPref:itemPrefs){ int itemIndex2=itemPref.get(); itemVector.set(itemIndex2, itemVector.get(itemIndex2)+1.0); } context.write(itemIndex1, new VectorWritable(itemVector)); // System.out.println(itemIndex1+","+itemVector); } }第三个MR:
含有两个Mapper,第一个MR(31)把MR(2)的输出的格式转为VectorOrPrefWritable;
MR(32)针对MR(1)的输出把每一个项目ID和用户ID作为一对进行输出,输出格式也为VectorOrPrefWritable;WiKiDriver31.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.PATH; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable; public class WiKiDriver31 { /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf1 = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: WiKiDriver31 <in> <out>"); System.exit(2); } Job job1 = new Job(conf1, "wiki job three1"); job1.setOutputFormatClass(SequenceFileOutputFormat.class); job1.setInputFormatClass(SequenceFileInputFormat.class); job1.setNumReduceTasks(1); job1.setJarByClass(WiKiDriver31.class); job1.setMapperClass(WikiMapper31.class); job1.setMapOutputKeyClass(IntWritable.class); job1.setMapOutputValueClass(VectorOrPrefWritable.class); // set a reducer only to use SequenceFileOutputFormat job1.setReducerClass(WiKiReducer31.class); job1.setOutputKeyClass(IntWritable.class); job1.setOutputValueClass(VectorOrPrefWritable.class); // this MR's input is the MR2's output SequenceFileInputFormat.addInputPath(job1, new Path(PATH+otherArgs[0])); SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1])); if(!job1.waitForCompletion(true)){ System.exit(1); // run error then exit } } }WiKiMapper31.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable; import org.apache.mahout.math.VectorWritable; public class WikiMapper31 extends Mapper<IntWritable ,VectorWritable,IntWritable,VectorOrPrefWritable>{ public void map(IntWritable key,VectorWritable value,Context context) throws IOException, InterruptedException{ context.write(key, new VectorOrPrefWritable(value.get())); // System.out.println("key"+key.toString()+",vlaue"+value.get()); } }WiKiReducer31.java
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable; public class WiKiReducer31 extends Reducer<IntWritable ,VectorOrPrefWritable,IntWritable,VectorOrPrefWritable> { public void reduce(IntWritable key,Iterable<VectorOrPrefWritable> values ,Context context ) throws IOException, InterruptedException{ for(VectorOrPrefWritable va:values){ context.write(key, va); } } }WiKiDriver32.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.PATH; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable; public class WiKiDriver32 { /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf1 = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: WiKiDriver32 <in> <out>"); System.exit(2); } Job job1 = new Job(conf1, "wiki job one"); job1.setOutputFormatClass(SequenceFileOutputFormat.class); job1.setInputFormatClass(SequenceFileInputFormat.class); job1.setNumReduceTasks(1); job1.setJarByClass(WiKiDriver32.class); job1.setMapperClass(WikiMapper32.class); job1.setMapOutputKeyClass(IntWritable.class); job1.setMapOutputValueClass(VectorOrPrefWritable.class); job1.setReducerClass(WiKiReducer32.class); job1.setOutputKeyClass(IntWritable.class); job1.setOutputValueClass(VectorOrPrefWritable.class); // the WiKiDriver's out put is this one's input SequenceFileInputFormat.addInputPath(job1, new Path(PATH+otherArgs[0])); SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1])); if(!job1.waitForCompletion(true)){ System.exit(1); // run error then exit } } }WikiMapper32.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; public class WikiMapper32 extends Mapper<VarLongWritable ,VectorWritable,IntWritable,VectorOrPrefWritable>{ public void map(VarLongWritable key,VectorWritable value,Context context) throws IOException, InterruptedException{ long userID=key.get(); Vector userVector=value.get(); Iterator<Vector.Element> it=userVector.iterateNonZero(); IntWritable itemi=new IntWritable(); while(it.hasNext()){ Vector.Element e=it.next(); int itemIndex=e.index(); float preferenceValue=(float)e.get(); itemi.set(itemIndex); context.write(itemi, new VectorOrPrefWritable(userID,preferenceValue)); // System.out.println("item :"+itemi+",userand val:"+userID+","+preferenceValue); } // System.out.println(); } }
WiKiReducer32.java 其实和WiKiReducer31.java一模一样的,此处不再给出;
下接 Hadoop 实现协同过滤 (example in <Mahout in action> chapter 6) Part 2
分享,快乐,成长
相关推荐
<groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</...
基于hadoop和协同过滤算法实现商品推荐系统源码(课程设计项目).zip基于hadoop和协同过滤算法实现商品推荐系统源码(课程设计项目).zip基于hadoop和协同过滤算法实现商品推荐系统源码(课程设计项目).zip基于hadoop和...
<groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> ...
基于hadoop和协同过滤算法实现商品推荐系统源码(毕设项目).zip基于hadoop和协同过滤算法实现商品推荐系统源码(毕设项目).zip基于hadoop和协同过滤算法实现商品推荐系统源码(毕设项目).zip基于hadoop和协同过滤算法...
基于Hadoop的协同过滤视频推荐系统源码(课程设计).zip 基于Hadoop的协同过滤视频推荐系统源码(课程设计).zip 基于Hadoop的协同过滤视频推荐系统源码(课程设计).zip 基于Hadoop的协同过滤视频推荐系统源码(课程设计)....
+"<input> <output> <centerFile> <tempFile> <iterTimes> <threshold> <K> <vNum> <numReduce>\n" +"\t<input>:输入文件路径\n" +"\t<output>:输出文件路径\n" +"\t<centerFile>:初始中心路径\n" +"\t...
以开源项目Hadoop为实验平台,论证传统协同过滤算法无法适应云平台;从相似度和预测偏好两方面,借鉴共词分析法,将传统协同过滤算法改进为适应Hadoop平台的分布式协同过滤算法;实现顺序组合式MapRe-duce协同过滤任务,并...
PART 2 - Hadoop in Action CHAPTER 4 Writing basic MapReduce programs CHAPTER 5 Advanced MapReduce CHAPTER 6 Programming practices CHAPTER 7 Cookbook CHAPTER 8 Managing Hadoop PART 3 - Hadoop Gone Wild...
hadoop支持LZO压缩配置 将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/ core-site.xml增加配置支持LZO压缩 <?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl...
1 ■ Meet Apache Mahout 1 PART 1 RECOMMENDATIONS ...................................................11 2 ■ Introducing recommenders 13 3 ■ Representing recommender data 26 4 ■ Making ...
-mv <src> <dst> //移动多个文件到目标目录 -cp <src> <dst> //复制多个文件到目标目录 -rm(r) //删除文件(夹) -put <localsrc> <dst> //本地文件复制到hdfs -copyFromLocal //同put -moveFromLocal //从本地文件...
Hadoop In Action 中文第二版 卷二 rar
hadoop,spark,hbase,zookeeper,kafka配置文件。 例如: <?xml version="1.0" encoding="UTF-8"?>... <value>file:/home/bigData/bigdata/hadoop/tmp</value> </property> </configuration>
基于Hadoop的协同过滤视频推荐系统+源代码+文档说明 -------- 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载...
基于Hadoop和协同过滤(物品)的推荐程序+源代码+文档说明 -------- 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心...
用法:解压后把hadoop-lzo-0.4.15.jar 放到你的hadoop 安装路径下的lib 下,把里面lib/Mac_OS_X-x86_64-64 下的所有文件 拷到 hadoop 安装路径下的lib/native ,再把core-site.xml 增加 <property> <name>io....
项目概述:本项目是基于Hadoop平台的Java实现用户协同过滤算法源码。该算法专注于用户间的行为模式与偏好相似性,以提供个性化的推荐服务。项目主要采用Java语言开发,共包含53个文件,其中Java源文件32个,独立的...
Hadoop权威指南中文版(第二版)+Hadoop in Action(英文版) + pro Hadoop(英文版)