分享

hadoop源码解析之配置信息处理

zhuqitian 2017-2-27 09:09:22 发表于 其它 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 5283
翻笔记时发现了我的hadoop目录下存着很多东西,今天把源码解析里的部分共享出来,后续待我深入后继续分享

show your knowledge with the world!
==============================
//目标:了解Configuration各成员变量的具体含义,
//Configuration类中的其他部分都是为了操作这些变量而实现的解析、设置、获取方法
//其中资源的加载主要是通过:addResource方法和addDefaultResource静态方法加载的,
// 前者是加载系统默认配置文件之外的资源,后者则是加载系统默认资源如:hdfs-dafault.xml
//hadoop的配置文件都是xml格式的,使用JAXP中的DOM方法处理xml,JAXP有两种处理xml的方法(SAX和DOM)
//因为hadoop的配置文件都比较小所以采用DOM文档树形式处理,SAX适合处理大的xml文件
package org.apache.hadoop.conf;

static{
  //print deprecation warning if hadoop-site.xml is found in classpath
  ClassLoader cL = Thread.currentThread().getContextClassLoader();
  if (cL == null) {
    cL = Configuration.class.getClassLoader();
  }
  if(cL.getResource("hadoop-site.xml")!=null) {
    LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
        "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
        + "mapred-site.xml and hdfs-site.xml to override properties of " +
        "core-default.xml, mapred-default.xml and hdfs-default.xml " +
        "respectively");
  }
  addDefaultResource("core-default.xml");
  addDefaultResource("core-site.xml");
}

//下面两个变量都是加载配置文件相关的
private Properties properties;
private Properties overlay;
//类加载器变量
private ClassLoader classLoader;
{
  classLoader = Thread.currentThread().getContextClassLoader();
  if (classLoader == null) {
    classLoader = Configuration.class.getClassLoader();
  }
}


//老版本中addResource方法被重载了3次分别是:InputStream,Path,String,url。新版本中重载了6次
//此处用于加载classpath资源
public void addResource(String name) {
  addResourceObject(new Resource(name));
}
public void addResource(Configuration conf) {
  addResourceObject(new Resource(conf.getProps()));
}
public void addResource(InputStream in, String name) {
  addResourceObject(new Resource(in, name));
}
public void addResource(InputStream in) {
  addResourceObject(new Resource(in));
}
public void addResource(Path file) {
  addResourceObject(new Resource(file));
}
//加载系统外的资源
public void addResource(URL url) {
  addResourceObject(new Resource(url));
}


/**
* Get the {@link URL} for the named resource.
*
* @param name resource name.
* @return the url for the named resource.
*/
//将字符串转换为url,然后加载资源,这里资源指的不是配置文件了,如图像声音文本等,返回读取资源的url对象
public URL getResource(String name) {
  return classLoader.getResource(name);
}

//reloadConfiguration方法用于addDefaultResource方法中加载系统默认资源前清空properties和finalParameters
public synchronized void reloadConfiguration() {
  properties = null;                            // trigger reload
  finalParameters.clear();                      // clear site-limits
}


/**
* List of configuration resources.
*/
private ArrayList<Resource> resources = new ArrayList<Resource>();

//addResourceObject方法也会调用reloadConfiguration方法去清空properties和finalParameters
private synchronized void addResourceObject(Resource resource) {
  resources.add(resource);                      // add to resources
  reloadConfiguration();
}


/**
* List of default Resources. Resources are loaded in the order of the list
* entries
*/
//静态成员变量,用于存放配置文件
private static final CopyOnWriteArrayList<String> defaultResources =
  new CopyOnWriteArrayList<String>();
//用于在addDefaultResource方法中加载系统默认资源使用,相当于一个标志,是系统默认配置文件才加载
private boolean loadDefaults = true;

/**
* Configuration objects
*/
private static final WeakHashMap<Configuration,Object> REGISTRY =
  new WeakHashMap<Configuration,Object>();

/**
* Reload configuration from previously added resources.
*
* This method will clear all the configuration read from the added
* resources, and final parameters. This will make the resources to
* be read again before accessing the values. Values that are added
* via set methods will overlay values read from the resources.
*/
//reloadConfiguration方法用于addDefaultResource方法中加载系统默认资源前清空properties和finalParameters
public synchronized void reloadConfiguration() {
  properties = null;                            // trigger reload
  finalParameters.clear();                      // clear site-limits
}

/**
* List of default Resources. Resources are loaded in the order of the list
* entries
*/
//静态成员变量,用于存放配置文件
private static final CopyOnWriteArrayList<String> defaultResources =
  new CopyOnWriteArrayList<String>();

/**
* Add a default resource. Resources are loaded in the order of the resources
* added.
* @param name file name. File should be present in the classpath.
*/
//加载系统默认资源,在hdfs中,会把hdfs-defaule.xml和hdfs-site.xml作为默认资源
//并通过addDefaultResource方法保存在成员变量defaultResource中
//在mapreuce中,会把mapred-default.xml和mapred-site.xml作为默认资源
//synchronized,该方法在hadoop生命周期中只加载一次
//再次通过defaultResource变量是否已经包含配置文件名称去判断。如果不包含就加载
//还会通过loadDefaults布尔变量去标志配置文件是否是系统默认配置文件,如果是才会加载
//加载资源并不是马上加载,而是调用reloadConfiguration方法清空properties和parametees,其实就是触发资源的重新加载
//资源的重新加载是以静态成员REGISTRY作为媒介进行的,REGISTY静态成员中记录了系统中所有的Configuration对象
public static synchronized void addDefaultResource(String name) {
  if(!defaultResources.contains(name)) {
    defaultResources.add(name);
    for(Configuration conf : REGISTRY.keySet()) {
      if(conf.loadDefaults) {
        conf.reloadConfiguration();
      }
    }
  }
}

比如HdfsConfiguration类中就使用这个方法加载系统默认的配置文件
package org.apache.hadoop.hdfs;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;

import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DeprecatedKeys;

/**
* Adds deprecated keys into the configuration.
*/
@InterfaceAudience.Private
public class HdfsConfiguration extends Configuration {
  //加载系统默认资源
  static {
    addDeprecatedKeys();

    // adds the default resources
    Configuration.addDefaultResource("hdfs-default.xml");
    Configuration.addDefaultResource("hdfs-site.xml");
  }


  //延迟加载的设计模式:当真正需要配置数据的时候才会解析配置文件
  //在addDefaultResource方法中会去清空properties和parameters(均不是静态成员)
protected synchronized Properties getProps() {
  if (properties == null) {
    properties = new Properties();
    Map<String, String[]> backup =
        new ConcurrentHashMap<String, String[]>(updatingResource);
    loadResources(properties, resources, quietmode);

    if (overlay != null) {
      properties.putAll(overlay);
      for (Map.Entry<Object,Object> item: overlay.entrySet()) {
        String key = (String)item.getKey();
        String[] source = backup.get(key);
        if(source != null) {
          updatingResource.put(key, source);
        }
      }
    }
  }
  return properties;
}


//初始化一个unkown的String name,把resourceName赋值给name
  //得到用于创建DOM解析器的工厂
  //忽略xml中的注释
  //提供对xml名称空间的支持
  //允许XIncludeAware机制(包含机制),这个关键,hadoop允许讲xml分解成单个小块,然后合并起来一起处理
  //上面就是个工厂然后设置一些DOM属性,下面开始重点
  //获取解析xml的DocumentBuilder对象
  //根据不同资源,调用DocumentBuilder.parse  如:String Path InputStream Url Path Element Properties
  //第二部分代码是去根据DOM解析结果设置properties和finalParamters
private Resource loadResource(Properties properties, Resource wrapper, boolean quiet) {
  String name = UNKNOWN_RESOURCE;
  try {
    Object resource = wrapper.getResource();
    name = wrapper.getName();

    DocumentBuilderFactory docBuilderFactory
      = DocumentBuilderFactory.newInstance();
    //ignore all comments inside the xml file
    docBuilderFactory.setIgnoringComments(true);

    //allow includes in the xml file
    docBuilderFactory.setNamespaceAware(true);
    try {
        docBuilderFactory.setXIncludeAware(true);
    } catch (UnsupportedOperationException e) {
      LOG.error("Failed to set setXIncludeAware(true) for parser "
              + docBuilderFactory
              + ":" + e,
              e);
    }
    DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
    Document doc = null;
    Element root = null;
    boolean returnCachedProperties = false;

    if (resource instanceof URL) {                  // an URL resource
      doc = parse(builder, (URL)resource);
    } else if (resource instanceof String) {        // a CLASSPATH resource
      URL url = getResource((String)resource);
      doc = parse(builder, url);
    } else if (resource instanceof Path) {          // a file resource
      // Can't use FileSystem API or we get an infinite loop
      // since FileSystem uses Configuration API.  Use java.io.File instead.
      File file = new File(((Path)resource).toUri().getPath())
        .getAbsoluteFile();
      if (file.exists()) {
        if (!quiet) {
          LOG.debug("parsing File " + file);
        }
        doc = parse(builder, new BufferedInputStream(
            new FileInputStream(file)), ((Path)resource).toString());
      }
    } else if (resource instanceof InputStream) {
      doc = parse(builder, (InputStream) resource, null);
      returnCachedProperties = true;
    } else if (resource instanceof Properties) {
      overlay(properties, (Properties)resource);
    } else if (resource instanceof Element) {
      root = (Element)resource;
    }

    //
    if (root == null) {
      if (doc == null) {
        if (quiet) {
          return null;
        }
        throw new RuntimeException(resource + " not found");
      }
      root = doc.getDocumentElement();
    }
    Properties toAddTo = properties;
    if(returnCachedProperties) {
      toAddTo = new Properties();
    }
    //根节点应该是configuration
    if (!"configuration".equals(root.getTagName()))
      LOG.fatal("bad conf file: top-level element not <configuration>");
    //获取根节点下的所有子节点
    //如果子节点不是Element,忽略
    NodeList props = root.getChildNodes();
    DeprecationContext deprecations = deprecationContext.get();
    for (int i = 0; i < props.getLength(); i++) {
      Node propNode = props.item(i);
      if (!(propNode instanceof Element))
        continue;
      Element prop = (Element)propNode;
      //如果子节点是configuration,递归调用loadResource()方法
      //configuration的子节点也可以是configuration
      if ("configuration".equals(prop.getTagName())) {
        loadResource(toAddTo, new Resource(prop, name), quiet);
        continue;
      }
      //如果子节点是property,
      if (!"property".equals(prop.getTagName()))
        LOG.warn("bad conf file: element not <property>");

      String attr = null;
      String value = null;
      boolean finalParameter = false;
      LinkedList<String> source = new LinkedList<String>();

      Attr propAttr = prop.getAttributeNode("name");
      if (propAttr != null)
        attr = StringInterner.weakIntern(propAttr.getValue());
      propAttr = prop.getAttributeNode("value");
      if (propAttr != null)
        value = StringInterner.weakIntern(propAttr.getValue());
      propAttr = prop.getAttributeNode("final");
      if (propAttr != null)
        finalParameter = "true".equals(propAttr.getValue());
      propAttr = prop.getAttributeNode("source");
      if (propAttr != null)
        source.add(StringInterner.weakIntern(propAttr.getValue()));

      //获取所有子节点,查找name,value,final,resource的值
      NodeList fields = prop.getChildNodes();
      for (int j = 0; j < fields.getLength(); j++) {
        Node fieldNode = fields.item(j);
        if (!(fieldNode instanceof Element))
          continue;
        Element field = (Element)fieldNode;
        if ("name".equals(field.getTagName()) && field.hasChildNodes())
          attr = StringInterner.weakIntern(
              ((Text)field.getFirstChild()).getData().trim());
        if ("value".equals(field.getTagName()) && field.hasChildNodes())
          value = StringInterner.weakIntern(
              ((Text)field.getFirstChild()).getData());
        if ("final".equals(field.getTagName()) && field.hasChildNodes())
          finalParameter = "true".equals(((Text)field.getFirstChild()).getData());
        if ("source".equals(field.getTagName()) && field.hasChildNodes())
          source.add(StringInterner.weakIntern(
              ((Text)field.getFirstChild()).getData()));
      }
      source.add(name);

      //如果属性已经标记为final,则忽略
      // Ignore this parameter if it has already been marked as 'final'
      if (attr != null) {
        if (deprecations.getDeprecatedKeyMap().containsKey(attr)) {
          DeprecatedKeyInfo keyInfo =
              deprecations.getDeprecatedKeyMap().get(attr);
          keyInfo.clearAccessed();
          for (String key:keyInfo.newKeys) {
            // update new keys with deprecated key's value
            loadProperty(toAddTo, name, key, value, finalParameter,
                source.toArray(new String[source.size()]));
          }
        }
        else {
          loadProperty(toAddTo, name, attr, value, finalParameter,
              source.toArray(new String[source.size()]));
        }
      }
    }

    if (returnCachedProperties) {
      overlay(properties, toAddTo);
      return new Resource(toAddTo, name);
    }
    return null;
  } catch (IOException e) {
    LOG.fatal("error parsing conf " + name, e);
    throw new RuntimeException(e);
  } catch (DOMException e) {
    LOG.fatal("error parsing conf " + name, e);
    throw new RuntimeException(e);
  } catch (SAXException e) {
    LOG.fatal("error parsing conf " + name, e);
    throw new RuntimeException(e);
  } catch (ParserConfigurationException e) {
    LOG.fatal("error parsing conf " + name , e);
    throw new RuntimeException(e);
  }
}


//get方法会调用substitueVars方法,该方法会完成而配置的属性扩展,其实就是正则解析,比如value中可以传入$name
//而这个$name又是源于另一property的name,进行字符串替换
public String get(String name) {
  String[] names = handleDeprecation(deprecationContext.get(), name);
  String result = null;
  for(String n : names) {
    result = substituteVars(getProps().getProperty(n));
  }
  return result;
}

//substituteVars方法中进行属性扩展,解析value中的变量次数,默认可以循环20次
private static final int MAX_SUBST = 20;

/**
* Attempts to repeatedly expand the value {@code expr} by replacing the
* left-most substring of the form "${var}" in the following precedence order
* <ol>
*   <li>by the value of the environment variable "var" if defined</li>
*   <li>by the value of the Java system property "var" if defined</li>
*   <li>by the value of the configuration key "var" if defined</li>
* </ol>
*
* If var is unbounded the current state of expansion "prefix${var}suffix" is
* returned.
* <p>
* This function also detects self-referential substitutions, i.e.
* <pre>
*   {@code
*   foo.bar = ${foo.bar}
*   }
* </pre>
* If a cycle is detected then the original expr is returned. Loops
* involving multiple substitutions are not detected.
*
* @param expr the literal value of a config key
* @return null if expr is null, otherwise the value resulting from expanding
* expr using the algorithm above.
* @throws IllegalArgumentException when more than
* {@link Configuration#MAX_SUBST} replacements are required
*/
//属性扩展,老版本是正则匹配的方法,现在直接上字符串了
private String substituteVars(String expr) {
  if (expr == null) {
    return null;
  }
  String eval = expr;
  for(int s = 0; s < MAX_SUBST; s++) {
    final int[] varBounds = findSubVariable(eval);
    if (varBounds[SUB_START_IDX] == -1) {
      return eval;
    }
    final String var = eval.substring(varBounds[SUB_START_IDX],
        varBounds[SUB_END_IDX]);
    String val = null;
    try {
      if (var.startsWith("env.") && 4 < var.length()) {
        String v = var.substring(4);
        int i = 0;
        for (; i < v.length(); i++) {
          char c = v.charAt(i);
          if (c == ':' && i < v.length() - 1 && v.charAt(i + 1) == '-') {
            val = getenv(v.substring(0, i));    //获取属性扩展的键
            if (val == null || val.length() == 0) {
              val = v.substring(i + 2);
            }
            break;
          } else if (c == '-') {
            val = getenv(v.substring(0, i));
            if (val == null) {
              val = v.substring(i + 1);
            }
            break;
          }
        }
        if (i == v.length()) {
          val = getenv(v);
        }
      } else {
        val = getProperty(var);
      }
    } catch(SecurityException se) {
      LOG.warn("Unexpected SecurityException in Configuration", se);
    }
    if (val == null) {
      val = getRaw(var);
    }
    if (val == null) {
      return eval; // return literal ${var}: var is unbound
    }

    final int dollar = varBounds[SUB_START_IDX] - "${".length();
    final int afterRightBrace = varBounds[SUB_END_IDX] + "}".length();
    final String refVar = eval.substring(dollar, afterRightBrace);

    // detect self-referential values
    if (val.contains(refVar)) {
      return expr; // return original expression if there is a loop
    }

    // substitute
    eval = eval.substring(0, dollar)
           + val
           + eval.substring(afterRightBrace);
  }
  throw new IllegalStateException("Variable substitution depth too large: "
                                  + MAX_SUBST + " " + expr);
}

//substituteVars方法中,调用了getProperty这个方法,这一步保证了首先使用系统属性做属性扩展
//比如在编译hadoop源码时可以这么来:-Dname=value 这个属于系统级,优先级别最高
String getProperty(String key) {
  return System.getProperty(key);
}

================================接口
//接口,谁实现谁给答案,进行一些基于Configuration实例进一步初始化对象
/** Something that may be configured with a {@link Configuration}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Configurable {

  /** Set the configuration to be used by this object. */
  void setConf(Configuration conf);

  /** Return the configuration used by this object. */
  Configuration getConf();
}


================================总结

hadoop的配置文件管理采用的是JAXP的DOM解析xml
其实配置文件管理的类里(Configuration),就是get* 和 set*
get*会比较复杂,set*会简单很多,就是对一些像properties,overlay,finalParamters等成员变量的setProperty(),保存传入的键值对
配置系统是复杂软件必不可少的部分,其中资源加载,资源合并,和属性扩展等都是比较重要的处理过程

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条