高防服务器

hadoop如何自定义格式化输出


hadoop如何自定义格式化输出

发布时间:2021-12-09 16:25:34 来源:高防服务器网 阅读:64 作者:小新 栏目:大数据

这篇文章给大家分享的是有关hadoop如何自定义格式化输出的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

import java.io.IOException;  import java.net.URI;  import org.apache.commons.logging.Log;  import org.apache.commons.logging.LogFactory;  import org.apache.hadoop.conf.Configuration;  import org.apache.hadoop.fs.FSDataOutputStream;  import org.apache.hadoop.fs.FileSystem;  import org.apache.hadoop.fs.Path;  import org.apache.hadoop.io.LongWritable;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapreduce.Job;  import org.apache.hadoop.mapreduce.Mapper;  import org.apache.hadoop.mapreduce.RecordWriter;  import org.apache.hadoop.mapreduce.TaskAttemptContext;  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  public class CustomizeOutputFormat {  	static final Log LOG = LogFactory.getLog(CustomizeOutputFormat.class);  	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  		Configuration conf = new Configuration();  		Job job = Job.getInstance(conf);  		  		job.setJarByClass(CustomizeOutputFormat.class);  		job.setMapperClass(CustMapper.class);  		  		job.setMapOutputKeyClass(Text.class);  		job.setMapOutputValueClass(Text.class);  		//此处这只自定义的格式化输出  		job.setOutputFormatClass(CustOutputFormat.class);  		String jobName = "Customize outputformat test!";  		job.setJobName(jobName);  		FileInputFormat.addInputPath(job, new Path(args[0]));  		FileOutputFormat.setOutputPath(job, new Path(args[1]));  		  		boolean b = job.waitForCompletion(true);  		if(b) {  			LOG.info("Job "+ jobName +" is done.");  			  		}else {  			LOG.info("Job "+ jobName +"is going wrong,now exit.");  			System.exit(0);  		}  		  	}  }  class CustMapper extends Mapper<LongWritable, Text, Text, Text>{  	String[] textIn = null;  	Text outkey = new Text();  	Text outvalue = new Text();  	@Override  	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)  			throws IOException, InterruptedException {  		/**  		 * 假设文件的内容为如下:  		 * boys	girls  		 * firends goodbye  		 * down up  		 * fly to  		 * neibors that  		 *   		 */  		textIn = value.toString().split("t");  		outkey.set(textIn[0]);  		outvalue.set(textIn[1]);  		context.write(outkey, outvalue);		  	}	  }  //自定义OutoutFormat  class CustOutputFormat extends FileOutputFormat<Text, Text>{  	@Override  	public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {  		//获得configration  		Configuration conf = context.getConfiguration();  		//获得FileSystem  		FileSystem fs =  FileSystem.newInstance(conf);  		//获得输出路径  		Path path = CustOutputFormat.getOutputPath(context);  		URI uri = path.toUri();  		//创建两个文件,得到写入流  		FSDataOutputStream foa = fs.create(new Path(uri.toString()+"/out.a"));  		FSDataOutputStream fob = fs.create(new Path(uri.toString()+"/out.b"));	  		//创建自定义RecordWriter  传入 两个流  		CustRecordWriter rw = new CustRecordWriter(foa,fob);  		return rw;  		  	}  	  	  	class CustRecordWriter extends RecordWriter<Text, Text>{  		 FSDataOutputStream foa = null;  		 FSDataOutputStream fob = null;  		CustRecordWriter(FSDataOutputStream foa,FSDataOutputStream fob){  			this.foa = foa;  			this.fob = fob;  		}  		@Override  		public void write(Text key, Text value) throws IOException, InterruptedException {  			String mText  = key.toString();  			//根据可以长度的不同分别输入到不同的文件  			if(mText.length()>=5) {  				foa.writeUTF(mText+"t"+value.toString()+"n");  			}else {  				fob.writeUTF(mText+"t"+value.toString()+"n");  			}  		}  		@Override  		public void close(TaskAttemptContext context) throws IOException, InterruptedException {  			//最后将两个写入流关闭  			if(foa!=null) {  				foa.close();  			}  			if(fob!=null) {  				fob.close();  			}  		}  	}  	  }  //使用MultipleInputs,c处理多个来源的文件  package hgs.multipuleinput;  import java.io.IOException;  import org.apache.hadoop.conf.Configuration;  import org.apache.hadoop.fs.Path;  import org.apache.hadoop.io.NullWritable;  import org.apache.hadoop.mapreduce.Job;  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;  import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  import hgs.custsort.SortBean;  import hgs.custsort.SortDriver;  import hgs.custsort.SortMapper;  import hgs.custsort.SortReducer;  public class MultipuleInputDriver {  	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  		Configuration conf = new Configuration();  		Job job = Job.getInstance(conf);  		  		job.setJarByClass(SortDriver.class);  		job.setMapperClass(SortMapper.class);  		job.setReducerClass(SortReducer.class);  		job.setOutputKeyClass(SortBean.class);  		job.setOutputValueClass(NullWritable.class);  		  		MultipleInputs.addInputPath(job, new Path("/sort"), TextInputFormat.class,SortMapper.class);  		MultipleInputs.addInputPath(job, new Path("/sort1"), TextInputFormat.class,SortMapper.class);  		//FileInputFormat.setInputPaths(job, new Path("/sort"));  		FileOutputFormat.setOutputPath(job, new Path("/sortresult"));  		System.exit(job.waitForCompletion(true)==true?0:1);  	}  }

感谢各位的阅读!关于“hadoop如何自定义格式化输出”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

[微信提示:高防服务器能助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。

[图文来源于网络,不代表本站立场,如有侵权,请联系高防服务器网删除]
[