package com.demo.admin;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class Test extends Configured implements Tool {//构建map类public static class TestMap extends Mapper{public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{//根据$开始切割字段名final String[] splited = value.toString().split(“\\$”);//以name为key从0开始final String name = splited[0];final Text k2 = new Text(name);//phone shiqu就是第四位和第八位final TestWritable v2=new TestWritable(splited[4], splited[8]);//name为key值 phone和shiqu为v2写入context.write(k2, v2);}}//构建reduce类public static class TestReduce extends Reducer {public void reduce(Text k2,Iterable v2s,Context context) throws IOException, InterruptedException{String phone;String shiqu;//循环所有的key值和values值for(TestWritable testWritable:v2s){phone=testWritable.phone;shiqu=testWritable.shiqu;TestWritable v3=new TestWritable(phone, shiqu);context.write(k2, v3);}}}//main方法启动public static void main(String [] args) throws IOException, Exception{ToolRunner.run(new Test(), args);}@SuppressWarnings(“deprecation”)public int run(String[] args) throws Exception {Configuration conf=new Configuration();String[]argArray=new GenericOptionsParser(conf, args).getRemainingArgs();if(argArray.length!=2){System.out.println(“请提供两个参数”);System.exit(1);}Job job=Job.getInstance(conf, “Test”);FileSystem fs = FileSystem.get(new URI(args[1]), conf);fs.delete(new Path(args[1]));job.setJarByClass(Test.class);job.setMapperClass(TestMap.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TestWritable.class);job.setReducerClass(TestReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(TestWritable.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));job.waitForCompletion(true);return 0;}static class TestWritable implements Writable{String phone;String shiqu;public TestWritable(String phone,String shiqu){this.phone=phone;this.shiqu=shiqu;}//无参构造方法public class UserBean implements Writable//这个应该是在自定义writable的时候需要注意,反射过程中需要调用无参构造。public TestWritable(){}public void readFields(DataInput in) throws IOException {this.phone=in.readUTF();this.shiqu=in.readUTF();}public void write(DataOutput out) throws IOException {out.writeUTF(phone);out.writeUTF(shiqu);}public String toString() {return phone + “\t” + shiqu + “\t”;}}} 示例文件:张三$25$男$未婚$15997444444$409930360$中国$湖北$广水输入文件:张三 15997444444广水 shell 命令:/usr/local/hadoop/bin/hadoop fs -put /home/XX/test.txt /test_log//usr/local/hadoop/bin/hadoop jar /home/XX/test.jar /test_log/test.txt /test_cleaned/ 1>/dev/nul