注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

长风明志的博客

不要也不能做下一个谁,应该且可以做第一个自己

 
 
 

日志

 
 

Hadoop多路输入  

2014-05-13 20:08:57|  分类: Hadoop学习 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
思路1:一个mapper对应多个input路径,需要mapper做判断是哪个路径(如:通过匹配路径中包含某个特定词语来判断)。
思路2:使用MultipleInputs,每个mapper对应相应输入路径,实验代码如下:
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;




public class testMultipleInputs
{
  public static void main(String[] args) throws Exception
  {
if (args.length !=3)
{
System.err.println("Usage:testMultipleInputs <input path1> <input path2> <output path>");
System.exit(-1);
}
String inputPath1=args[0];
String inputPath2=args[1];
String outputPath=args[2];
    JobConf conf = new JobConf(testMultipleInputs.class);

    conf.setJobName("testMultipleInputs");

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);
    conf.setOutputFormat(TextOutputFormat.class);
    
    MultipleInputs.addInputPath(conf,new Path(inputPath1),TextInputFormat.class,Map.class);
    MultipleInputs.addInputPath(conf,new Path(inputPath2),TextInputFormat.class,Map2.class);
    FileOutputFormat.setOutputPath(conf, new Path(outputPath));
    conf.setReducerClass(Reduce.class);

    conf.setNumReduceTasks(1);   //不设置的话默认为100

    int n = JobClient.runJobReturnExitCode(conf);
    System.out.println(n);
    if (n == 0)
      System.out.println("succeed!");
    else
      System.out.println("error!");
  }
  

  public static class Map extends MapReduceBase
  implements Mapper<LongWritable, Text, Text, Text>
  {

    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
      throws IOException
    {

    String[] arr = value.toString().split("\t", -1);
    String newValue=arr[0]+"\t"+arr[1]+"\t"+arr[2];
    output.collect(new Text(arr[0]),new Text(newValue) );         //原样输出
    }
  }
  

  public static class Map2 extends MapReduceBase
  implements Mapper<LongWritable, Text, Text, Text>
  {

    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
      throws IOException
    {
    String[] arr = value.toString().split("\t", -1);
    String newValue=arr[0]+ "\t"+ arr[2]+ "\t"+ arr[1];
    output.collect(new Text(arr[0]),new Text(newValue) );         //调换顺序后输出
    }
  }
  
  public static class Reduce extends MapReduceBase
  implements Reducer<Text, Text, Text, Text>
  {
    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter)
      throws IOException
    {
      String line="";
      while (values.hasNext())
        line += values.next().toString()+"; ";
      output.collect(key, new Text(line));
    }
  }
}

  评论这张
 
阅读(987)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017