分享

ZooKeeper实例—进程调度系统实践

pig2 2014-1-27 23:23:30 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 9996
一、背景:

用户不断向Hadoop集群中提交Job,在集群运行过程提交的任务状态:等待、执行、执行成功、执行失败

现在使用ZooKeeper对其进行集中管理,让第一次处理失败的任务回调后再次执行.当多次执行失败,系统将

认定此任务存在错误,停止对此任务的回调并将其保存下来,同时保存下成功的任务.

二、设计方案

设定节点/root为所有被存储在节点的根节点,并设置四个存储不同状态的节点:/root/wait  /root/processed  /root/temp  /root/error

节点名称为提交该任务时产生的任务编号(JobID),存储的值为对应的回调操作的shell指令

三、设计实现

public class InitConfReader{
private String confFileUrl:
public Map<String , String>  getConfs(List<String> keys){
Map<String  ,String>result=new HashMap<String, String>();
Properties properties=new Properties();
try{

//从配置文件中读取配置信息,并用读取到的信息作为服务器启动的参数
properties.load(new FileReader(new File(confFileUrl));
}
catch(FileNotFoundException e){
e.printStackTrace();
}
catch(IOException e){
e.printStackTrace();
}
for(String key:kwys){
String value=(String)properties.get(key);
result.put(key, value);
}
return result;
}
}

②初始化与维护原始数据节点

public class SchedulingServer implments Watcher{
private ZooKeeper zooKeeper;
privae String connectString; //conectString连接字符串,包括IP地址,服务器端口号
private int  session Timeout;
public void initConf() thows Exception{
InitConfReader reader=new InitConfReader(“init.properties”);
List<String>keys=new ArrayList<String>();
keys.add(“connectSring”);
keys.add(“sessionTimeout”);
Map<String  ,String>confs=reader.getConfs(keys);
this.connectString=conf.get(“connectString”);
this.sessionTimeout=Integer.parseInt(confs.get(“sessionTimeout”));
zooKeeper=new ZooKeeper(connectString, sessionTimeout ,this);
}
/*
1.整个系统中的所有静态节点均被创建。
2.“/root”节点被创建,“/root/client”未被创建
3.“/root/client”被创建但其下子节点一个或多个未被创建.
4.存储状态的一个或多个节点未被创建
*/
public void initServer() throws  Exception{
//stat用于存储被监测节点是否存在,若不存在则对应的值为null
Stat stat=zooKeeprr.exits(“/root” , false);
if(stat==null){
//根节点
zooKeeper.create(“/root”,null,Ids.OPEN_ACL_UNSAFE,createMode.PERSISTENT);
//失败任务存储节点
zooKeeper.create(“/root/error”,null,Ids.OPEN_ACL_UNSAFE,createMode.PERSISTENT);
//成功任务存储节点
zooKeeper.create(“/root/processed”,null,Ids.OPEN_ACL_UNSAFE,createMode.PERSISTENT);
//等待和正在运行任务存储节点
zooKeeper.create(“/root/wait”,null,Ids.OPEN_ACL_UNSAFE,createMode.PERSISTENT);
//临时存储第一次处理失败的节点
zooKeeper.create(“/root/temp”,null,Ids.OPEN_ACL_UNSAFE,createMode.PERSISTENT);
}
stat=zooKeeper.exists(“root/error”, false);
if(stat==null){
zooKeeper.create(“/root/error”,null,Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISITENT);
}
stat=zooKeeper.exists(“/root/processed”,false);
if(stat==null){
zooKeeper.create(“/root/processed”,null,Ids.OPEN_ACL_UNSAFE,createMode.PERSISTENT);
}
stat=zooKeeper.exists(“root/wait”,false);
if(stat==null){
zooKeeper.create(“/root/wait”,null,Ids.OPEN_ACL_UNSAFE,createMode.PERSISTENT);
}
}
public void process(WatchedEvent event){
}
}

③检测节点中存储的任务编号所对应的Job运行状态

public  class ServerMonitor implements Watcher,Runnable{

private ZooKeeper  zooKeeper;

private String connectString;

private int session Timeout;

private String hadoopHome;

private String mapredJob Trachker;

//初始化文件加载,并用其内容配置ZooKeeper服务器的连接

public void initConf() throws  Exeception{

InitConfReader reader=new InitConfReader(“init.properties”);

List<String> keys=new ArrayList<String>();

keys.add(“connectString”);

keys.add(“sessionTimeout”);

keys.add(“hadoopHome”);

keys.add(“mapred.job.tracher”);

Map<String , String>confs=reader.getConfs(keys);

this.hadoopHome=confs.get(“hadoopHome”);

this.mapredJobTracker=confs.get(“mapred.job.tracker”);

zooKeeper=new ZooKeeper(connectString,sessionTimeout,this);

//监视节点中存储的任务状态变化

public ServerMonitor() throws Exeeption{

SchedulingServer schedulingServer=new SchedulingServer();

schedulingServer.initConf();

schedulingServer.initServer();

public void process(WatchedEvent   event){ }

/* 一个任务可能出于:等待,运行,成功,失败,杀死等状态中的一个

1.任务出于等待或运行状态,不做任何操作,继续检测任务状态,知道状态发生变化

2.任务出于成功状态,从”/root/client/wait”中删除,并将其插入到”/root/clients/processed”当中,并停止对该节点进行检测

3.程序第一次出于失败或杀死状态,将任务插入“/root/client/temp”中,并回调,如果连续两次都是失败或被杀死,则将其插入”/root/client/error”并停止对此任务的检测。

public void monitorNode() throws  Exception{

List<String>waits=zooKeeper.getChildren(“/root/clien/wait” ,false);

JobConf conf=new JobConf();

conf.set(“mapred.job.tracker” , mapredJobTracker);

JobClient  jobClient=new  JobClient(conf);

for(String wait:waits){

String data=new String(zooKeeper.getdata(“/root/client/wait/”+wait,false,null);

JobID jobid=null; try{

jobid=JobID.forName(wait);

System.out.println(“hob id  is wrong!!!”) ;

Stat stat=zooKeeper.exists(“/root/client/error/”+wait,false);

zooKeeper.delete(“/root/client/error/”+wait,-1);     }

zooKeeper.delete(“/root/client/wait/”+wait,-1);

zooKeeper.create(“/root/client/error/”+wait,data,getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);  continus; }

//通过任务的JOBID来检测任务正在处于的状态

int runStat=jobClient.getJob(org.apache.hadoop.mapred.JobID)jobid).getJobState();

//处于等待和运行状态的任务在状态不发生改变前不做处理

case JobStatus.RUNNING:

//当任务执行成功后,删除原”/root/wait”目录下的节点并将其任务信息插入到“/root/wait/processed”

case JobStatus.SUCCEEDED:

zooKeeper.delete(“/root/client/wait/”+wait,-1);

zooKeeper.create(“/root/client/processed/”+wait,data.getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);

List<String>tempNodes=zooKeeper.getChildren(“/root/client/temp”,false);

if(tempNodes==null || tempNodes.size()==0) { break; }

for(String tempNode:tempNodes){

if(new String(zooKeeper,getData(“/root/client/temp”+tempNode,false,null)).equals(data)){               zooKeeper.delete(“/root/client/temp/”+tempNode,-1); } } }

//当任务执行失败或者任务呗杀掉,将会把任务插入“/root/temp”下并回调任务,如果任务回调后失败,则将任务插入”root/error”

case  JobSatus.FAILED:

case JobStatus.KILLED:

zooKeeper.delete(“/root/client/wait/”+wait,-1);

tempNodes=zooKeeper.getChildren(“/root/client/temp”,false);

zooKeeper.create(“/root/client/temp”+wait,data.getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

if(tempNodes==null  ||  tempNode.size()==0){

shellTool.callBack(data,hadoopHome); }

for(String tempNode:tempNodes){

if(new String(zooKeeper.geData(“/root/client/temp”+temNode,false,null)).equals(data)){      zooKeeper.create(“/root/client/error/”+wait,data.getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);

zooKeeper.delete(“/root/client/temp/”+wait, -1);

zooKeeper.delete(“/root/client/temp”+ tempNode, -1); flag=false; } }

ShellTool.callBack(data, hadoopHome); } }

public  void  run()  {

ServerMonitor serverWaitMonitor = new  ServerMonitor();

ServerWaitMonitor.monitorNode(); Thread.sleep(5000);

} } catch(Exception e){

e.printStackTrace(); } }

public static void  main(String[] args) throws Exception{

Thread thread =new  Thread(new  ServerMonitor());
来自群组: Hadoop技术组

已有(1)人评论

跳转到指定楼层
anyhuayong 发表于 2014-9-14 08:15:59
好文章,支持一下
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条