博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
(一)MR编写之读取linux上(指定目录下的)配置文件
阅读量:6998 次
发布时间:2019-06-27

本文共 8930 字,大约阅读时间需要 29 分钟。

hot3.png

我首先把main函数和调用方式写一下

/** * * @param args * 1、传入参数 * 2、传出参数 * 3、业务参数:①stg_log_  ②stg_log_class_perform_ * 4、配置文件路径:如xetl.properties */public static void main(String[] args) {    if (args.length < 2) {        System.out.println("args must more than 2.");        System.exit(0);    }    // day=(args[0].split("/"))[3];    Configuration conf = new Configuration();    FileSystem hdfs = null;    try {        int res = ToolRunner.run(conf, new AutoActLogParseMr(), args);        System.exit(res);    } catch (Exception e) {        logger.error("", e);    }}

调用方式

hadoop jar( jar包路径和名字) com.whh.bigdata.xetl.mr.AutoActLogParseMr  ${srcParth} ${log_data}/${com_present_date_Y_m_d} 'stg_log_' ${BASE_PATH}/include/xetl.properties

数据线:hadoop fs -cp /log_data/2017-09-12/1600035/* /log_data/stg_log_1600035/day=2017-06-22/

下面倒序一下传入到MR的配置信息:

配置信息最终是要在map中使用的,所以传到map时是用map中的setup(Context context)方法的context,下面我来讲解一下setup方法:

protected void setup(Context context) throws IOException, InterruptedException {            try {                String mysqlUser = context.getConfiguration().get("mysqlUser");                String mysqlUrl = context.getConfiguration().get("mysqlUrl");                String mysqlPassword = context.getConfiguration().get("mysqlPassword");                String dbname = context.getConfiguration().get("dbname");                String string = context.getConfiguration().get("fs.allActs");                actMap = AutoActLogParseUtil.getHiveStaticConf(string,mysqlUrl,mysqlUser,mysqlPassword,dbname);            } catch (SQLException e) {                e.printStackTrace();            }            super.setup(context);        }        @Override        protected void cleanup(Context context)                throws IOException, InterruptedException {        }

***插播:在hadoop的源码中,基类Mapper类和Reducer类中都是只包含四个方法:setup方法,cleanup方法,run方法,map方法。

/**   * Expert users can override this method for more complete control over the   * execution of the Mapper.   * @param context   * @throws IOException   */  public void run(Context context) throws IOException, InterruptedException {        setup(context);       try {                 while (context.nextKeyValue()) {                         map(context.getCurrentKey(), context.getCurrentValue(), context);                 }       } finally {               cleanup(context);       }  }

可以看出,在run方法中调用了上面的三个方法:setup方法,map方法,cleanup方法。其中setup方法和cleanup方法默认是不做任何操作,且它们只被执行一次。但是setup方法一般会在map函数之前执行一些准备工作,如作业的一些配置信息等;cleanup方法则是在map方法运行完之后最后执行 的,该方法是完成一些结尾清理的工作,如:资源释放等。如果需要做一些配置和清理的工作,需要在Mapper/Reducer的子类中进行重写来实现相应的功能。map方法会在对应的子类中重新实现,就是我们自定义的map方法。该方法在一个while循环里面,表明该方法是执行很多次的。run方法就是每个maptask调用的方法。

那参数是怎么传到context的呢?

下面来看看run方法:

public int run(String[] params) throws Exception {        Configuration conf = getConf();        conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");        Integer numReduceTasks = 3;        FileSystem hdfs = null;        try {            // 程序配置//            conf.set("fs.default.name", "hdfs://Galaxy");            //config.set("hadoop.job.ugi", "feng,111111");            //config.set("hadoop.tmp.dir", "/tmp/hadoop-fengClient");            //config.set("dfs.replication", "1");            //config.set("mapred.job.tracker", "master:9001");//            hdfs = FileSystem.get(new URI("hdfs://Galaxy"),//                    conf, "bigdata");            Path path = new Path("/log_data/");            hdfs = path.getFileSystem(conf);         //   logger.info("path 的值:" + path);            String flag=params[2];            acts = getOutPutName(hdfs, path, conf,flag);            conf.set("fs.allActs", acts);        } catch (Exception e) {            e.printStackTrace();        }        // acts = Hdfstools.readHDFSFile("/log_data/actId");      //  logger.info("acts的值为" + acts);        //获取配置文件信息        Config propertiesConfig = new Config();        propertiesConfig.init(params[3]);        String mysqlUrl = propertiesConfig.getValue("mysqlUrl");        String mysqlUser = propertiesConfig.getValue("mysqlUser");        String mysqlPassword = propertiesConfig.getValue("mysqlPassword");        String dbname = propertiesConfig.getValue("dbname");        conf.set("mysqlUser",mysqlUser);        conf.set("mysqlUrl",mysqlUrl);        conf.set("mysqlPassword",mysqlPassword);        conf.set("dbname",dbname);        Job job = Job.getInstance(conf);        job.setJarByClass(AutoActLogParseMr.class);        job.setMapperClass(AutoActLogParseMr.AutoActLogParseMaper.class);        job.setReducerClass(AutoActLogParseMr.AutoActLogParseReducer.class);        //将第一个路径参数作为输入参数        FileInputFormat.setInputPaths(job, new Path(params[0]));        //将第二个参数作为输出参数        FileOutputFormat.setOutputPath(job, new Path(params[1]));        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(Text.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);        job.setNumReduceTasks(numReduceTasks);        String dirName[] = acts.split(Constant.MARK_AITE);        for (String a : dirName) {            MultipleOutputs.addNamedOutput(job, a, TextOutputFormat.class,                    NullWritable.class, Text.class);        }        logger.info("---excuter---");        return job.waitForCompletion(true) ? 0 : 1;    }

import org.apache.hadoop.conf.Configuration; 创建一个conf对象: Configuration conf = getConf();

//获取配置文件信息//Config 是自定义的一个类,实现了从配置文件获取数据的方法:        Config propertiesConfig = new Config();        propertiesConfig.init(params[3]);        String mysqlUrl = propertiesConfig.getValue("mysqlUrl");        String mysqlUser = propertiesConfig.getValue("mysqlUser");        String mysqlPassword = propertiesConfig.getValue("mysqlPassword");        String dbname = propertiesConfig.getValue("dbname");        conf.set("mysqlUser",mysqlUser); //再set给Configuration 对象        conf.set("mysqlUrl",mysqlUrl);        conf.set("mysqlPassword",mysqlPassword);        conf.set("dbname",dbname);        Job job = Job.getInstance(conf);

***插播:Config 是自定义的一个类,实现了从配置文件获取数据的方法,具体代码如下:

import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.io.*;import java.util.Properties;public class Config {    private static Log logger = LogFactory.getLog(Config.class);    private static Properties propertie=null;    private static FileInputStream inputFile=null;    private static FileOutputStream outputFile=null;       /**     * 必须首先使用该方法初始化系统配置,否则将会在调用Config.getValue(key,default)时获得默认值,而Config.getValue(key)将返回空     * @param configPathFile     */  public static void init(String configPathFile){        propertie = new Properties();        try {            InputStream input = new FileInputStream(new File(configPathFile));            if (input != null){                propertie.load(input);                input.close();            }        } catch (Exception ex) {            ex.printStackTrace();        }    }    public static void initJarFile(String configPathFile) {        propertie = new Properties();        try {            //读jar包根目录下的文件            InputStream input = Config.class.getClass().getResourceAsStream("/" + configPathFile);            if (input != null) {                propertie.load(input);                input.close();            }        } catch (Exception e) {            logger.error("read properties file error: ", e);        }    }    public static String getValue(String key) {        if (propertie!=null&&propertie.containsKey(key)) {            String value = propertie.getProperty(key);            try {				value=new String(value.getBytes("iso-8859-1"));			} catch (UnsupportedEncodingException e) {			}            return value;        } else            return "";    }    public static String getValue(String key, String defaultValue) {    	String v=null;       if(key!=null){    	   v=getValue(key);       }       if(v==null||v.trim().length()==0){    	   v=defaultValue;       }       return v;    }    public static void clear() {        propertie.clear();    }    public static void setValue(String key, String value) {        propertie.setProperty(key, value);    }    public static void saveFile(String fileName, String description) {        try {            outputFile = new FileOutputStream(fileName);            propertie.store(outputFile, description);            outputFile.close();        } catch (Exception e) {            logger.error(e);        }    }    public static void main(String[] args) {        Properties pps=System.getProperties();        //pps.list(System.out);        Config config = new Config();        config.initJarFile("mr_hbase.properties");        String numReduceTasksStr = config.getValue("numReduceTasks");        System.out.println(numReduceTasksStr);    }}

总结:配置文件的信息是通过MR的参数传入的,在run方法中经过org.apache.hadoop.conf.Configuration的对象的set方法传给Mapper类context,再经过setup()方法,进行赋值给Mapper类的类对象,供map使用。

转载于:https://my.oschina.net/u/3267050/blog/1817244

你可能感兴趣的文章