分享

豆瓣Redis解决方案Codis源码剖析:Dashboard

丫丫 发表于 2015-8-10 13:08:25 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 26859
本帖最后由 丫丫 于 2015-8-10 13:17 编辑

问题导读


1.Dashboard涉及到哪些知识?
2.如何启动Dashboard?
3.什么是Slot?
4.如何初始化Slot?
5.数据迁移实现了哪些特性?
6.对于迁移中的Slot,如果恰好此时有客户端要访问该Slot中的某个Key该怎么办?






1.不只是Dashboard
虽然名字叫Dashboard,但它在Codis中的作用却不可小觑。它不仅仅是Dashboard管理页面,更重要的是,它负责监控和指挥各个Proxy的负载均衡(数据分布和迁移)。并且,所有API都以RESTFul接口的形式对外提供,供Proxy和codis-config(Codis的命令行工具)调用。下面就来看一下数据分布和迁移的代码执行流程。
Dashboard涉及到的知识点比较多,包括Martini框架、Model模型层、数据的负载均衡分配、Redis中Slot的实现、ZooKeeper的Sequential结点实现消息队列、迁移过程中的数据访问等等。与此同时,本篇还记录了在研究过程中的发散思考,例如Java中的AR模型层。虽然内容稍有些庞杂,但都是真实的记录,相信于人于己都会有很大帮助。


2.Dashboard2.1 codis-config命令行
codis-config是cmd/cconfig下编译出的命令行工具,它以命令行的形式提供对Codis常用命令的支持,例如启动Dashboard、初始化slot以及migrate,后两者会被封装成REST请求发送到Dashboard执行。下面简单看一下main.go的源码:
[mw_shl_code=text,true]func
main() {
    ...
    args, err := docopt.Parse(usage,
nil, true, "codis config v0.1", true)
    if err != nil {
        log.Error(err)
    }

    // set config file
    var configFile string
    var config *cfg.Cfg
    if args["-c"] != nil {
        configFile = args["-c"].(string)
        config, err = utils.InitConfigFromFile(configFile)
        if err != nil {
            Fatal(err)
        }
    } else {
        config, err = utils.InitConfig()
        if err != nil {
            Fatal(err)
        }
    }

    // load global vars
    globalEnv = env.LoadCodisEnv(config)

    cmd := args["<command>"].(string)
    cmdArgs := args["<args>"].([]string)

    go http.ListenAndServe(":10086", nil)
    err = runCommand(cmd, cmdArgs)
}

func runCommand(cmd string, args []string) (err error) {
    argv := make([]string, 1)
    argv[0] = cmd
    argv = append(argv, args...)
    switch cmd {
    case "action":
        return errors.Trace(cmdAction(argv))
    case "dashboard":
        return errors.Trace(cmdDashboard(argv))
    case "server":
        return errors.Trace(cmdServer(argv))
    case "proxy":
        return errors.Trace(cmdProxy(argv))
    case "slot":
        return errors.Trace(cmdSlot(argv))
    }
    return errors.Errorf("%s is not a valid command. See 'codis-config -h'", cmd)
}[/mw_shl_code]


2.2 Dashboard启动
大家是否还记得,在安装Codis完之后,我们执行了smaple目录下的一系列脚本初始化Codis并启动服务,其中有一项./start_dashboard.sh就是启动Dashboard服务。它是调用的codis-config,传入的参数”dashboard”正好对应上面的main.go中的cmdDashboard()。
[mw_shl_code=text,true]#!/bin/sh

nohup ../bin/codis-config -c config.ini -L ./log/dashboard.log dashboard --addr=:
18087 --http-log=./log/requests.log &>/dev/null &[/mw_shl_code]


打开dashboard.go就能看到cmdDashboard()的实现,它调用的是下面的runDashboard()函数:

[mw_shl_code=text,true]func
runDashboard(addr
string, httpLogFile string) {
    log.Info("dashboard listening on addr: ", addr)
    m := martini.Classic()
    ...

    m.Use(martini.Static(filepath.Join(binRoot, "assets/statics")))
    m.Use(render.Renderer(render.Options{
        Directory:  filepath.Join(binRoot, "assets/template"),
        Extensions: []string{".tmpl", ".html"},
        Charset:    "UTF-8",
        IndentJSON: true,
    }))

    m.Get("/api/server_groups", apiGetServerGroupList)
    m.Get("/api/overview", apiOverview)

    m.Get("/api/redis/:addr/stat", apiRedisStat)
    m.Get("/api/redis/:addr/:id/slotinfo", apiGetRedisSlotInfo)
    m.Get("/api/redis/group/:group_id/:slot_id/slotinfo", apiGetRedisSlotInfoFromGroupId)
    ...

    // create temp node in ZK
    if err := createDashboardNode(); err != nil {
        Fatal(err)
    }
    defer releaseDashboardNode()

    // create long live migrate manager
    conn := CreateZkConn()
    defer conn.Close()
    globalMigrateManager = NewMigrateManager(conn, globalEnv.ProductName(), preMigrateCheck)
    defer globalMigrateManager.removeNode()

    m.RunOnAddr(addr)
}[/mw_shl_code]
值得注意的是,Dashboard使用的是Martini框架,非常容易就能暴露各种RESTFul接口。这里不深入研究Martini框架的用法了,但提一个Java中的“山寨Martini”框架-SparkJava。Spark这个名字实在太火了,这个框架跟分布式内存计算的那个Spark框架可没有一点关系。稍有些遗憾的是,使用SparkJava的前提是必须安装JDK 8,因为SparkJava大量使用了JDK 8中的特性:
[mw_shl_code=text,true]import

static spark.Spark.*;

// Visit http://localhost:4567/hello
public class HelloWorld {
    public static void main(String[] args) {
        get("/hello", (req, res) -> "Hello World");
    }
}[/mw_shl_code]


2.3 Slot的角色
在开始剖析数据分布和迁移的源码之前,先说一下Codis中Slot的概念。Slot是Codis的数据分配、迁移等操作的基本单位,可以把Slot看成一致性哈希中的Bucket、VNode等概念。客户端传来的Key经过某种hash函数(Codis用的是Crc32)对应到某个Slot中,因为哈希函数是一定的所以这个对应关系是无法改变的。但我们可以改变的是Slot与Group的对应关系,这样当新增Group时就可以通过调整Slot的分布达到负载均衡的效果。
不幸的是,Redis中并没有Slot这个概念,也就是说:虽然我们给Key分配好了Slot,但是一旦存入Redis后Key属于哪个Slot这个信息就丢失了。解决的方法有很多种,比如:
  • 1)在ZooKeeper中保存Key与Slot的对应关系,需要时就查询一下。这种方式类似HDFS中的NameNode,缺点是Key很多时会占用很多空间。
  • 2)数据迁移时遍历所有Key,用哈希函数现去算一下Key所属的Slot是否要迁移,缺点是迁移时计算量比较大,而且每个Slot迁移时都要去算可能有很多重复的计算量。
  • 3)保存到Redis之前在Key或Value中加入一些隐含信息,缺点是会改变业务的数据。
  • 4)修改Redis源码加入Slot的概念,在Redis中保存Key属于的Slot,并提供基于Slot的Migrate原子操作。Codis采取的是这种做法,缺点就是要修改Redis源码,以后升级Redis比较麻烦,尤其像Codis没有将改动封装到一个动态链接库则可能更为麻烦。
  • 5)利用Redis中database的概念替代Slot,GitHub上的xcodis采取的就是这种思想。缺点是每个Slot对应的Redis连接在使用前都要select到对应的数据库,否则就会修改到其他Slot的数据。


3.数据分布3.1 initslot.sh脚本
类似于start_dashboard.sh脚本,我们安装完Codis后执行的sample下./initslot.sh脚本也是通过codis-config完成slot的初始化工作的。打开initslot.sh脚本看看,发现它使用codis-config调用了slot的init和range-set两个方法:
[mw_shl_code=xml,true]#!/bin/sh

echo "slots initializing..."
../bin/codis-config -c config.ini slot init -f
echo "done"

echo "set slot ranges to server groups..."
../bin/codis-config -c  config.ini slot range-set 0 511 1 online
../bin/codis-config -c  config.ini slot range-set 512 1023 2 online
echo "done"[/mw_shl_code]

声明:因为codis-config除了启动Dashboard外,其主要作用就是封装RESTFul请求,代码比较简单,所以后面的源码剖析就都直接跳过codis-config的请求发送过程,直接跳到Dashboard接到请求后的处理过程,所有RESTFul API的实现都在dashboard_apis.go中。


3.2 初始化Slot
dashboard_apis.go中并没有直接实现初始化功能,而是调用了models包。实际上,Codis提取了一整套的Model类作为模型层,Dashboard和Proxy都引用了这套模型层。在Codis 2.0中Proxy的内部架构发生了不小的变化,然而模型层的代码相对很稳定,这也是DDD领域驱动设计的优势吧。
[mw_shl_code=text,true]func
apiInitSlots(r *http.Request) (
int, string) {
    r.ParseForm()

    isForce := false
    val := r.FormValue("is_force")
    if len(val) > 0 && (val == "1" || val == "true") {
        isForce = true
    }

    conn := CreateZkConn()
    defer conn.Close()

    if err := models.InitSlotSet(conn, globalEnv.ProductName(), models.DEFAULT_SLOT_NUM); err != nil {
        log.Warning(err)
        return 500, err.Error()
    }
    return jsonRetSucc()
}[/mw_shl_code]


下面就看一下Slot模型层,代码位置在pkg/models/slot.go。根据totalSlotNum(默认1024)逐一创建Slot对象,初始时GroupId是无效值-1。然后调用Slot对象的Update()函数保存到ZooKeeper或etcd中。
[mw_shl_code=text,true]func
InitSlotSet(zkConn zkhelper.Conn, productName
string, totalSlotNum int) error {
    for i := 0; i < totalSlotNum; i++ {
        slot := NewSlot(productName, i)
        if err := slot.Update(zkConn); err != nil {
            return errors.Trace(err)
        }
    }
    return nil
}

func NewSlot(productName string, id int) *Slot {
    return &Slot{
        ProductName: productName,
        Id:          id,
        GroupId:     INVALID_ID,
        State: SlotState{
            Status:   SLOT_STATUS_OFFLINE,
            LastOpTs: "0",
            MigrateStatus: SlotMigrateStatus{
                From: INVALID_ID,
                To:   INVALID_ID,
            },
        },
    }
}

func (s *Slot) Update(zkConn zkhelper.Conn) error {
    data, err := json.Marshal(s)
    zkPath := GetSlotPath(s.ProductName, s.Id)
    _, err = zkhelper.CreateOrUpdate(zkConn, zkPath, string(data), 0, zkhelper.DefaultFileACLs(), true)
    ...
}[/mw_shl_code]


个人觉得Codis的模型层写的不是太好,虽然提取了最重要的公共逻辑,但是模型层混入了太多的存储代码,包括JSON序列化和与ZooKeeper通信。再加上Golang本身的异常返回值判断,导致模型的很多函数代码都比较乱。
当然,也没必要严格遵照DDD将存储逻辑划分到Repository,甚至将模型划分成Value和Entity。而且Codis作者是短时间内完成开发的,所以不能“吹毛求疵”,看看后续Codis升级版本能否梳理出更加清晰的模型层。感觉ActiveRecord模式非常适合这种对ZooKeeper进行简单CRUD的场景,都是“单表”(ZooKeeper中的结点)操作而不存在关联操作,详细见最后一部分的分析。


3.3 分配Range
同样地,设置Range也是在Slot模型层中实现的。在./initslot.sh中,我们将0~511 Slot分配给了Group 1,将512~1023分配给Group 2。SetSlotRange()将范围内的Slot的GroupId从-1改为新分配的GroupId,并更新到ZooKeeper中:
[mw_shl_code=text,true]func
SetSlotRange(zkConn zkhelper.Conn, productName
string, fromSlot, toSlot, groupId int, status SlotStatus) error {
    ...
    for i := fromSlot; i <= toSlot; i++ {
        s, err := GetSlot(zkConn, productName, i)
        if err != nil {
            return errors.Trace(err)
        }
        s.GroupId = groupId
        s.State.Status = status
        data, err := json.Marshal(s)
        if err != nil {
            return errors.Trace(err)
        }

        zkPath := GetSlotPath(productName, i)
        _, err = zkhelper.CreateOrUpdate(zkConn, zkPath, string(data), 0, zkhelper.DefaultFileACLs(), true)
        if err != nil {
            return errors.Trace(err)
        }
    }
    ...
}[/mw_shl_code]

4.数据迁移
Codis的数据迁移实现了两个特性:
  • 不停机:迁移过程中,客户端可以继续通过Proxy访问Redis,不需要停机等待迁移完成。
  • 平滑:整个迁移过程不追求快速完成,而是非常平滑地完成,不会对正常业务产生影响。


4.1 Rebalance算法
下面就是Codis的Rebalance算法。两个辅助函数getLivingNodeInfos()和getQuotaMap()会拿到所有存活的Group以及其目标Quota。其中存活结点对象中保存的就是Group目前拥有的Slot数和内存数,而所谓目标Quota配额指的就是根据这些指标计算出Rebalance后每个Group应有的Slot数。
Codis目前使用的算法比较简单,不要被Rebalance()中的三层嵌套循环“吓住”了。具体来说,假如之前我们有Group 1负责Slot 0~511,Group 2负责Slot 512~1023。现在新增一个Group 3,没有负责任何Slot,此时通过命令行或Web管理页面触发”Auto Rebalance”。
因为Group 1和2的当前Slot肯定大于目标Quota,所以遍历到它俩时什么都不会做,主要处理就在Group 3。当遍历到Group 3时,它会从Group 1和2中不断迁移过来Slot,直到达到目标Quota再停止。
[mw_shl_code=text,true]// experimental simple auto rebalance :)

func Rebalance(zkConn zkhelper.Conn, delay int) error {
    targetQuota, err := getQuotaMap(zkConn)
    livingNodes, err := getLivingNodeInfos(zkConn)
    log.Info("start rebalance")
    for _, node := range livingNodes {
        for len(node.CurSlots) > targetQuota[node.GroupId] {
            for _, dest := range livingNodes {
                if dest.GroupId != node.GroupId && len(dest.CurSlots) < targetQuota[dest.GroupId] {
                    slot := node.CurSlots[len(node.CurSlots)-1]
                    // create a migration task
                    t := NewMigrateTask(MigrateTaskInfo{
                        Delay:      delay,
                        FromSlot:   slot,
                        ToSlot:     slot,
                        NewGroupId: dest.GroupId,
                        Status:     MIGRATE_TASK_MIGRATING,
                        CreateAt:   strconv.FormatInt(time.Now().Unix(), 10),
                    })
                    u, err := uuid.NewV4()
                    t.Id = u.String()

                    if ok, err := preMigrateCheck(t); ok {
                        // do migrate
                        err := t.run()
                    }
                    node.CurSlots = node.CurSlots[0 : len(node.CurSlots)-1]
                    dest.CurSlots = append(dest.CurSlots, slot)
                }
            }
        }
    }
    log.Info("rebalance finish")
    return nil
}

func getQuotaMap(zkConn zkhelper.Conn) (map[int]int, error) {
    nodes, err := getLivingNodeInfos(zkConn)
    ret := make(map[int]int)
    var totalMem int64
    totalQuota := 0
    for _, node := range nodes {
        totalMem += node.MaxMemory
    }

    for _, node := range nodes {
        quota := int(models.DEFAULT_SLOT_NUM * node.MaxMemory * 1.0 / totalMem)
        ret[node.GroupId] = quota
        totalQuota += quota
    }

    // round up
    if totalQuota < models.DEFAULT_SLOT_NUM {
        for k, _ := range ret {
            ret[k] += models.DEFAULT_SLOT_NUM - totalQuota
            break
        }
    }
    return ret, nil
}

func getLivingNodeInfos(zkConn zkhelper.Conn) ([]*NodeInfo, error) {
    groups, err := models.ServerGroups(zkConn, globalEnv.ProductName())
    slots, err := models.Slots(zkConn, globalEnv.ProductName())
    slotMap := make(map[int][]int)
    for _, slot := range slots {
        if slot.State.Status == models.SLOT_STATUS_ONLINE {
            slotMap[slot.GroupId] = append(slotMap[slot.GroupId], slot.Id)
        }
    }
    var ret []*NodeInfo
    for _, g := range groups {
        master, err := g.Master(zkConn)
        out, err := utils.GetRedisConfig(master.Addr, "maxmemory")
        maxMem, err := strconv.ParseInt(out, 10, 64)
        node := &NodeInfo{
            GroupId:   g.Id,
            CurSlots:  slotMap[g.Id],
            MaxMemory: maxMem,
        }
        ret = append(ret, node)
    }
    return ret, nil
}[/mw_shl_code]


具体每个Slot是如何迁移的“秘密”就在MigrateTask中,下面就去一探究竟!


4.2 Migrate任务
MigrateTask每次迁移一个Slot,并更新进度给Web监控页面显示。migrateSingleSlot()函数中首先修改Slot的状态,然后再调用Migrator做迁移。SetMigrateStatus()并不是简单的修改Slot状态保存到ZooKeeper中就完成了,它还肩负着与各个Proxy做Pre-migrate确认的工作,下面就会看到这个过程。注意Codis这里对Migrator做了抽象,提取出了接口。
[mw_shl_code=text,true]// migrate multi slots

func (t *MigrateTask) run() error {
    // create zk conn on demand
    t.zkConn = CreateZkConn()
    defer t.zkConn.Close()

    to := t.NewGroupId
    t.Status = MIGRATE_TASK_MIGRATING
    for slotId := t.FromSlot; slotId <= t.ToSlot; slotId++ {
        err := t.migrateSingleSlot(slotId, to)
        t.Percent = (slotId - t.FromSlot + 1) * 100 / (t.ToSlot - t.FromSlot + 1)
        log.Info("total percent:", t.Percent)
    }
    t.Status = MIGRATE_TASK_FINISHED
    log.Info("migration finished")
    return nil
}

func (t *MigrateTask) migrateSingleSlot(slotId int, to int) error {
    // set slot status
    s, err := models.GetSlot(t.zkConn, t.productName, slotId)
    from := s.GroupId
    if s.State.Status == models.SLOT_STATUS_MIGRATE {
        from = s.State.MigrateStatus.From
    }

    // modify slot status
    if err := s.SetMigrateStatus(t.zkConn, from, to); err != nil {
        log.Error(err)
        return err
    }

    err = t.slotMigrator.Migrate(s, from, to, t, func(p SlotMigrateProgress) {
        // on migrate slot progress
        if p.Remain%500 == 0 {
            log.Info(p)
        }
    })

    // migrate done, change slot status back
    s.State.Status = models.SLOT_STATUS_ONLINE
    s.State.MigrateStatus.From = models.INVALID_ID
    s.State.MigrateStatus.To = models.INVALID_ID
    if err := s.Update(t.zkConn); err != nil {
        log.Error(err)
        return err
    }
    return nil
}[/mw_shl_code]


4.3 Pre-migrate确认
SetMigrateStatus()首先会新建一个Action与各个Proxy进行确认,确认完毕后才会将Slot的状态改为Migrate。
[mw_shl_code=text,true]func
(s *Slot) SetMigrateStatus(zkConn zkhelper.Conn, fromGroup, toGroup
int) error {
    // wait until all proxy confirmed
    err := NewAction(zkConn, s.ProductName, ACTION_TYPE_SLOT_PREMIGRATE, s, "", true)

    s.State.Status = SLOT_STATUS_MIGRATE
    s.State.MigrateStatus.From = fromGroup
    s.State.MigrateStatus.To = toGroup
    s.GroupId = toGroup
    return s.Update(zkConn)
}[/mw_shl_code]
Action也是Codis中非常重要的一个Model,它是Dashboard与Proxy之间通过ZooKeeper交换信息的数据类。NewAction()函数经过删减还是这么一大堆,其实它要做的简单说来就是:1)创建Action;2)取到存活的Proxy列表;3)保存Action到actions和ActionResponse路径下;4)等待所有Proxy的确认消息。
这里除了之前提到的Codis的模型层混入了很多存储和序列化的代码以及异常返回值判断外,还有一点就是:用ZooKeeper实现消息队列很“蹩脚”。Codis的方法是先创建一个ZooKeeper的Sequential结点,目的就是获得一个唯一的序列号,然后立刻删除这个结点。最后使用这个序列号在actions和ActionResponse路径下创建真正的结点。
[mw_shl_code=text,true]func
NewAction(zkConn zkhelper.Conn, productName
string, actionType ActionType, target interface{}, desc string, needConfirm bool) error {
    ts := strconv.FormatInt(time.Now().Unix(), 10)
    action := &Action{
        Type:   actionType,
        Desc:   desc,
        Target: target,
        Ts:     ts,
    }

    // set action receivers
    proxies, err := ProxyList(zkConn, productName, func(p *ProxyInfo) bool {
        return p.State == PROXY_STATE_ONLINE
    })
    for _, p := range proxies {
        buf, err := json.Marshal(p)
        if err != nil {
            return errors.Trace(err)
        }
        action.Receivers = append(action.Receivers, string(buf))
    }

    b, _ := json.Marshal(action)

    prefix := GetWatchActionPath(productName)
    //action root path
    err = CreateActionRootPath(zkConn, prefix)

    //response path
    respPath := path.Join(path.Dir(prefix), "ActionResponse")
    err = CreateActionRootPath(zkConn, respPath)

    //create response node, etcd do not support create in order directory
    //get path first
    actionRespPath, err := zkConn.Create(respPath+"/", b, int32(zk.FlagSequence), zkhelper.DefaultFileACLs())

    //remove file then create directory
    zkConn.Delete(actionRespPath, -1)
    actionRespPath, err = zkConn.Create(actionRespPath, b, 0, zkhelper.DefaultDirACLs())

    suffix := path.Base(actionRespPath)

    // create action node
    actionPath := path.Join(prefix, suffix)
    _, err = zkConn.Create(actionPath, b, 0, zkhelper.DefaultFileACLs())

    if needConfirm {
        if err := WaitForReceiver(zkConn, productName, actionRespPath, proxies); err != nil {
            return errors.Trace(err)
        }
    }
    return nil
}[/mw_shl_code]

WaitForReceiver()等待Proxy确认的逻辑相对清晰一些,Proxy响应方式就是通过evtbus接收到消息后,将自己的Proxy模型类加入到ActionResponse路径下刚才新建的Action结点下,作为其子结点。WaitForReceiver()不断轮询子结点数,当等于所有存活Proxy数时就说明所有Proxy都确认应答了,可以返回了。如果发现某个Proxy没应答则将其置成下线,避免这个Proxy保存了过期的Slot与Group对应关系。
这里Codis没有清理掉已完成的ActionResponse下的Action结点,而是有专门的ActionGC()函数去处理,不得不说这部分的设计配合上ZooKeeper真是很复杂!
[mw_shl_code=text,true]func
WaitForReceiver(zkConn zkhelper.Conn, productName
string, actionZkPath string, proxies []ProxyInfo) error {
    times := 0
    var proxyIds []string
    var offlineProxyIds []string
    for _, p := range proxies {
        proxyIds = append(proxyIds, p.Id)
    }
    sort.Strings(proxyIds)
    // check every 500ms
    for times < 60 {
        if times >= 6 && (times*500)%1000 == 0 {
            log.Warning("abnormal waiting time for receivers", actionZkPath)
        }
        nodes, _, err := zkConn.Children(actionZkPath)
        var confirmIds []string
        for _, node := range nodes {
            id := path.Base(node)
            confirmIds = append(confirmIds, id)
        }
        if len(confirmIds) != 0 {
            sort.Strings(confirmIds)
            if utils.Strings(proxyIds).Eq(confirmIds) {
                return nil
            }
            offlineProxyIds = proxyIds[len(confirmIds)-1:]
        }
        times += 1
        time.Sleep(500 * time.Millisecond)
    }
    // set offline proxies
    for _, id := range offlineProxyIds {
        log.Errorf("mark proxy %s to PROXY_STATE_MARK_OFFLINE", id)
        if err := SetProxyStatus(zkConn, productName, id, PROXY_STATE_MARK_OFFLINE); err != nil {
            return err
        }
    }
    return ErrReceiverTimeout
}[/mw_shl_code]


4.4 Migrate过程
现在所有Proxy都确认了,终于可以迁移了!Migrate()会不断向Slot迁移的源Redis发送迁移命令”SLOTSMGRTTAGSLOT”,这个命令是Codis修改Redis源码自己实现的,每次会从这个Slot中随机迁移一个Key到目的Redis,并保证原子性。所以要用循环反复发送这个迁移命令到Redis,直到Slot中的所有Key都迁移完成了才算是完成。
[mw_shl_code=text,true]func
(m *CodisSlotMigrator) Migrate(slot *models.Slot, fromGroup, toGroup
int, task *MigrateTask, onProgress func(SlotMigrateProgress)) (err error) {
    groupFrom, err := models.GetGroup(task.zkConn, task.productName, fromGroup)
    groupTo, err := models.GetGroup(task.zkConn, task.productName, toGroup)
    fromMaster, err := groupFrom.Master(task.zkConn)
    toMaster, err := groupTo.Master(task.zkConn)

    c, err := redis.Dial("tcp", fromMaster.Addr)
    defer c.Close()

    _, remain, err := sendRedisMigrateCmd(c, slot.Id, toMaster.Addr)

    for remain > 0 {
        if task.Delay > 0 {
            time.Sleep(time.Duration(task.Delay) * time.Millisecond)
        }

        _, remain, err = sendRedisMigrateCmd(c, slot.Id, toMaster.Addr)
        if remain >= 0 {
            onProgress(SlotMigrateProgress{
                SlotId:    slot.Id,
                FromGroup: fromGroup,
                ToGroup:   toGroup,
                Remain:    remain,
            })
        }
    }
    return nil
}

func sendRedisMigrateCmd(c redis.Conn, slotId int, toAddr string) (int, int, error) {
    addrParts := strings.Split(toAddr, ":")
    if len(addrParts) != 2 {
        return -1, -1, ErrInvalidAddr
    }

    reply, err := redis.Values(c.Do("SLOTSMGRTTAGSLOT", addrParts[0], addrParts[1], MIGRATE_TIMEOUT, slotId))
    if err != nil {
        return -1, -1, err
    }

    var succ, remain int
    if _, err := redis.Scan(reply, &succ, &remain); err != nil {
        return -1, -1, err
    }
    return succ, remain, nil
}[/mw_shl_code]


4.5 迁移中的数据访问
对于迁移中的Slot,如果恰好此时有客户端要访问该Slot中的某个Key该怎么办?Codis不是遇到这个问题的第一个中间件,像Taobao Tair中也有对此的解决方案:
发生迁移的时候data server如何对外提供服务?
当迁移发生的时候, 我们举个例子, 假设data server A 要把 桶 3,4,5 迁移给data server B. 因为迁移完成前, 客户端的路由表没有变化, 客户端对 3, 4, 5 的访问请求都会路由到A. 现在假设 3还没迁移, 4 正在迁移中, 5已经迁移完成. 那么如果是对3的访问, 则没什么特别, 跟以前一样. 如果是对5的访问, 则A会把该请求转发给B,并且将B的返回结果返回给客户, 如果是对4的访问, 在A处理, 同时如果是对4的修改操作, 会记录修改log.当桶4迁移完成的时候, 还要把log发送到B, 在B上应用这些log. 最终A B上对于桶4来说, 数据完全一致才是真正的迁移完成
但Codis采取的是不同的策略。当迁移过程中发生数据访问时,Proxy会发送”slotsmgrttagone”迁移命令给Redis,强制将客户端要访问的Key立刻迁移,然后再处理客户端的请求。
[mw_shl_code=text,true]func
(s *Server) dispatch(r *PipelineRequest) {
    s.handleMigrateState(r.slotIdx, r.keys
[0])
    tr, ok := s.pipeConns[s.slots[r.slotIdx].dst.Master()]
    if !ok {
        //try recreate taskrunner
        if err := s.createTaskRunner(s.slots[r.slotIdx]); err != nil {
            r.backQ <- &PipelineResponse{ctx: r, resp: nil, err: err}
            return
        }
        tr = s.pipeConns[s.slots[r.slotIdx].dst.Master()]
    }
    tr.in <- r
}

func (s *Server) handleMigrateState(slotIndex int, key []byte) error {
    shd := s.slots[slotIndex]
    if shd.slotInfo.State.Status != models.SLOT_STATUS_MIGRATE {
        return nil
    }

    redisConn, err := s.pools.GetConn(shd.migrateFrom.Master())
    defer s.pools.ReleaseConn(redisConn)

    redisReader := redisConn.(*redispool.PooledConn).BufioReader()

    err = WriteMigrateKeyCmd(redisConn.(*redispool.PooledConn), shd.dst.Master(), 30*1000, key)
    ...
    return nil
}

func WriteMigrateKeyCmd(w io.Writer, addr string, timeoutMs int, key []byte) error {
    hostPort := strings.Split(addr, ":")
    if len(hostPort) != 2 {
        return errors.Errorf("invalid address " + addr)
    }
    respW := respcoding.NewRESPWriter(w)
    err := respW.WriteCommand("slotsmgrttagone", hostPort[0], hostPort[1],
        strconv.Itoa(int(timeoutMs)), string(key))
    return errors.Trace(err)
}[/mw_shl_code]


5.后记:AR模型层
前面曾提到过,对于存储到ZooKeeper的模型,个人感觉使用ActiveRecord模式(AR)会非常方便。以RubyOnRails中AR为例,下面是典型的AR用法:
[mw_shl_code=java,true]class User < ActiveRecord::Base

  
self.table_name = "tb_user"
end

user = User.new
user.name = "David"
user.occupation = "Code Artist"

user = User.find_by(name: 'David')
user.name = 'Dave'
user.save

users = User.where(name: 'David', occupation: 'Code Artist').order('created_at DESC')[/mw_shl_code]


在Rails或者说Ruby中,User继承ActiveRecord::Base类,运行时User就会自动具有增删改查方法。然而,要想在Java中实现类似的ActiveRecord模式的模型层不是件容易事,主要问题在于Java的静态性。尽管字节码中不断新增各种新字段来保存运行时的类信息,从而增强反射功能,但限制依旧存在。
其实,保存、更新、删除都好办,因为这三种操作的前提是已经拿到模型的一个实例了。最核心的问题在查询上!要想在Java静态方法中获取正在调用的类实例信息貌似做不到啊。像jOOQ、JFinal等框架实现AR时都采取了不同的“迂回”策略,通过各种间接方式获得运行时的对象信息。以下是一种更为直接的方式,在各个具体模型类中自己根据需要定义查询方法。
[mw_shl_code=java,true]public

abstract class Model {

    private static CuratorFramework client;

    private static ObjectMapper mapper;

    public String save() {
        try {
            return client.create()
                        .creatingParentsIfNeeded()
                        .withMode(ephemeral() ? EPHEMERAL : PERSISTENT)
                        .forPath(path(), marshal());
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    public void update() {
        try {
            client.setData().forPath(path(), marshal());
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    /** FIXME: Supposed to be static, yet cannot get class info in static context in Java */
    @SuppressWarnings("unchecked")
    public <T> T find() {
        try {
            return (T) unmarshal(client.getData().forPath(path()), getClass());
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }
    ...
}

public class Slot extends Model {

    private int id;

    private SlotState state;

    private int groupId;

    /**
     * Initialize all slots.
     * @param totalNum      total number of slots
     */
    public static void initSlotSet(int totalNum) {
        try {
            for (int i = 0; i < totalNum; i++) {
                new Slot(i).save();
            }
        } catch (Exception e) {
            throw new IllegalStateException("Error when init slots", e);
        }
    }

    public static Slot find(int id) {
        Slot slot = new Slot();
        slot.setId(id);
        return slot.find();
    }
}[/mw_shl_code]


这样虽然在Java里实现起来不那么优雅,但是在Model抽象类中统一管理了ZooKeeper客户端(Curator)和序列化框架(Jackson),并对子类提供了仿AR式的接口,使用起来还是很方便的,而且代码结构也清晰了很多。

原文:http://blog.csdn.net/dc_726/article/details/47355989


已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条