前言

本节实现分布式 ID 生成系统采用雪花算法实现唯一 ID;实现缓存架构采用 LRU (最近最少使用算法

雪花算法

分布式 ID 生成算法的有很多种,Twitter雪花算法SnowFlake)就是其中经典的一种。

SnowFlake算法生成id结果一个64bit大小整数,它的结构下图

在这里插入图片描述

SnowFlake 算法的优点:

SnowFlake 算法的缺点:

以上参考cloudyan/snowflake


在本项目中,与之有异同之处。采用 39 位表示时间戳,12 位表示机器 id,12位表示序列号

实现后的雪花算法:

  1. 通过自定义雪花算法生成 id 的服务,即表示为机器 id,则可以实现至多

    2

    12

    =

    4096

    2^{12}=4096

    212=4096服务

  2. 由于 skynet 内部时钟精度是 10ms,所以在同一时间戳(10ms)内,生成 id 的序列号依此递增,至多

    2

    12

    =

    4096

    2^{12}=4096

    212=4096 个。

  3. 39 位用于表示时间戳,

    2

    39

    /

    (

    100

    3600

    24

    365

    )

    =

    174

    2^{39}/(100*3600*24*365)=174

    239/(100360024365)=174 年。

雪花算法服务配置文件

-- snowflake conf 
snowflake_begin         = 1 
snowflake_end           = 2
snowflake_start_date    = "2003-01-21"

lualib/snowflake.lua

local skynet = require "skynet"

local _M = {}
local snowflake_service = {} -- service: begin - end 
local max_service_id
local cur_service_id = 0

-- 获取一个 snowflake 服务
local function get_snowflake_service()
    cur_service_id = cur_service_id + 1
    if cur_service_id > max_service_id then 
        cur_service_id = 1
    end 

    return snowflake_service[cur_service_id]
end 

-- 对外接口,雪花 id 算法生成
function _M.snowflake()
    local addr = get_snowflake_service()
    return skynet.call(addr, "lua", "snowflake")
end 

skynet.init(function()
    skynet.uniqueservice("snowflake")

    local snowflake_begin = tonumber(skynet.getenv("snowflake_begin")) or 1
    local snowflake_end = tonumber(skynet.getenv("snowflake_end")) or 10
    assert(snowflake_begin <= snowflake_end, "snowflake_begin or snowflake_end error")

    local i = 0
    for id = snowflake_begin, snowflake_end do  
        i = i + 1
        local service_name = string.format(".snowflake_%s", id)
        snowflake_service[i] = skynet.localname(service_name) --  返回同一进程内,用 register 注册的具名服务的地址
    end 
    max_service_id = i
end)

return _M 

可以看到,服务采用主从架构通过简单轮询算法负载均衡。生成的服务数量由 snowflake_begin snowflake_end 配置

我们再来看 snowflake 服务代码

service/snowflake.lua

-------- master --------

-- 启动主节点服务,创建多个从节点服务
skynet.start(function()
    local snowflake_begin = tonumber(skynet.getenv("snowflake_begin")) or 1
    local snowflake_end = tonumber(skynet.getenv("snowflake_end")) or 10
    assert(snowflake_begin <= snowflake_end, "snowflake_begin or snowflake_end error")

    for id = snowflake_begin, snowflake_end do 
        skynet.newservice(SERVICE_NAME, "slave", id)
    end 
    skynet.register(".snowflake")
end)

主节点仅负责启动多个从节点服务,通过 skynet.newservice(SERVICE_NAME, "slave", id)启动并传入参数参数 id 则用于后续标识机器的 id。

从节点用于提供生成 ID 的雪花算法,并维护当前这个从服务的时间戳,定时每 3s 保存文件中。

-- 将 2000-01-01 形式日期,转为时间戳
local function parse_date(date)
    local year, month, day = date:match("(%d+)-(%d+)-(%d+)")
    return os.time({year = year, month = month, day = day})
end 
local start_date = skynet.getenv("snowflake_start_date") or "2000-01-01"
local START_TIMESTAMP = parse_date(start_date)

-- 每一部分占用位数
local TIME_BIT      = 39    -- 时间占用位数
local SEQUENCE_BIT  = 12    -- 序列号占用位数
local MACHINE_BIT   = 12    -- 机器标识占用位数

-- 每一部分最大值
local MAX_TIME      = 1 << TIME_BIT     -- 时间最大值      ((1 << 39) / 365 * 24 * 3600 * 100) ==&gt; 174 year
local MAX_SEQUENCE  = 1 << SEQUENCE_BIT -- 序列号最大值     (4096)
local MAX_MACHINE   = 1 << MACHINE_BIT  -- 机器标识最大值   (4096)

-- 每一部分向左的偏移
local LEFT_MACHINE  = SEQUENCE_BIT                  -- 12
local LEFT_TIME     = SEQUENCE_BIT + MACHINE_BIT    -- 24

-- snowflake 接口
function CMD.snowflake()
    local cur = get_cur_timestamp()
    if cur < last_timestamp then 
        error("Clock moved backwards.  Refusing to generate id")
    end 
    if cur == last_timestamp then 
        -- 相同 10ms 内,序列号自增
        sequence = (sequence + 1) &amp; MAX_SEQUENCE

        if sequence == 0 then 
            cur = get_next_timestamp()
        end 
    else 
        -- 不同 10ms 内,序列号置 0
        sequence = 0
    end 
    
    last_timestamp = cur

    return (cur - START_TIMESTAMP) << LEFT_TIME | slave_id << LEFT_MACHINE | sequence
end 

代码中可以看出,生成 id 的时间戳是相较于配置文件snowflake_start_date 起始的。并且在相同 10ms 内,序列号自增,如果序列超出 12 位的最大值,那么强制变为下一个 10ms 的时间戳。

雪花算法 snowflake,实际返回的 id:(cur - START_TIMESTAMP) << LEFT_TIME | slave_id << LEFT_MACHINE | sequence,即分别将时间戳、机器 id、序列号,向左偏移到二进制对应位置返回

-- 10ms
local function get_cur_timestamp()
    return math.floor(skynet.time() * 100)
end 

local function get_next_timestamp()
    local cur = get_cur_timestamp()
    while cur <= last_timestamp do 
        cur = get_cur_timestamp()
    end 
    return cur
end 

skynet.time通过 starttime 和 now 计算当前 UTC 时间(单位是秒, 精度是ms),get_cur_timestamp 获取当前时间戳函数控制了 10ms 为一个单位

完整代码service/snowflake


LRU 算法

缓存模块使用最经典的 LRU 算法实现,淘汰策略是最近最少使用数据。详细的介绍参考百度百科

LRU 算法在 leetcode 上也有相应试题,我们参考实现自己的 LRU 算法。

Go 语言版本

type entry struct {
    key int 
    value int 
}

type LRUCache struct {
    ll          *list.List
    cache       map[int]*list.Element
    maxBytes    int 
    nBytes      int
}


func Constructor(capacity int) LRUCache {
    lru := LRUCache{}
    lru.ll = list.New()
    lru.cache = make(map[int]*list.Element)
    lru.maxBytes = capacity
    lru.nBytes = 0
    return lru
}

func RemoveOldest(this *LRUCache) {
    ele := this.ll.Back()
    if ele != nil {
        this.ll.Remove(ele)
        delete(this.cache, ele.Value.(*entry).key)
        this.nBytes -= 1
    }
}

func (this *LRUCache) Get(key int) int {
    if ele, ok := this.cache[key]; ok {
        this.ll.MoveToFront(ele)
        return ele.Value.(*entry).value
    }
    return -1
}


func (this *LRUCache) Put(key int, value int)  {
    if ele, ok := this.cache[key]; ok {
        this.ll.MoveToFront(ele)
        ele.Value = &amp;entry{key, value}
    } else {
        ele := this.ll.PushFront(&amp;entry{key, value})
        this.cache[key] = ele 
        this.nBytes += 1 
    }

    for this.maxBytes < this.nBytes &amp;&amp; this.maxBytes != 0 {
        RemoveOldest(this)
    }
}
/**
 * Your LRUCache object will be instantiated and called as such:
 * obj := Constructor(capacity);
 * param_1 := obj.Get(key);
 * obj.Put(key,value);
 */

根据上述 Go 语言实现的 LRU,需要一个双向链表模块,还有一个哈希表。哈希表在 lua 中实际就是 table,那么下面首先实现双向链表结构
lualib/list.lua

local list = {} 
local mt = { __index = list }

-- entry { key, value, next, prev }

function list.New()
    local self = setmetatable({}, mt)
    self.size = 0
    self.head = {}
    self.tail = {} 
    self.head.next = self.tail 
    self.tail.prev = self.head 
    return self 
end 

function list.Back(self)
    if self.size ~= 0 then 
        return self.tail.prev 
    end 
    return nil 
end 

-- insert entry after at; list.size++; return entry
local function insert(self, entry, at)
    entry.prev = at 
    entry.next = at.next 
    entry.prev.next = entry 
    entry.next.prev = entry
    self.size = self.size + 1
    return entry
end 

function list.PushFront(self, entry)
    return insert(self, entry, self.head)
end 

-- move entry after at;
local function move(self, entry, at)
    if entry == at then 
        return 
    end 

    entry.prev.next = entry.next 
    entry.next.prev = entry.prev

    entry.prev = at
    entry.next = at.next

    entry.prev.next = entry
    entry.next.prev = entry
end 

function list.MoveToFront(self, entry)
    if entry == self.head or self.size <= 1 then 
        return 
    end 

    move(self, entry, self.head)
end 

function list.Remove(self, entry)
    if entry == nil then 
        return 
    end

    entry.prev.next = entry.next
    entry.next.prev = entry.prev
    entry.next, entry.prev, entry.key, entry.value = nil, nil, nil, nil
    entry = nil  

    self.size = self.size - 1
end 

return list 

设计的对外接口仅和 Go 语言代码一致,满足后续的 LRU 算法模块实现,这里不再过多赘述双向列表的实现。

下面来看 LRU 模块设计lru.lua

主要实现了 newsetget 三个方法

function lru.new(size, on_remove)
    local self = setmetatable({}, mt)
    self.list = list.New()
    self.cache = {} 
    self.capacity = size 
    self.size = 0
    self.on_remove = on_remove
    return self 
end 

function lru.set(self, key, value, force)
    local entry = self.cache[key]
    if entry then 
        entry.value = value
        self.list:MoveToFront(entry)
    else 
        local entry = {
            key = key,
            value = value
        }
        self.list:PushFront(entry)
        self.cache[key] = entry
        self.size = self.size + 1
    end 

    while true do 
        if self.size > self.capacity and not force then
            lru_remove(self)
        else
            break 
        end 
    end 
end 

function lru.get(self, key)
    local entry = self.cache[key]
    if entry == nil then 
        return 
    end 
    self.list:MoveToFront(entry)
    return entry.value
end 

lru 模块,不仅要有 list 双向链表结构cache 哈希结构capacity 缓存容量上限,size 当前数据量,还需要一个 on_remove 回调方法用于缓存结构移除数据时,执行该数回调操作。由使用者进行注册,并且一个 lru 模块所有数据共享这一个回调方法,即执行的回调操作是相同的。例如后续实现的缓存模块的 lru 结构回调方法,在删除数据时后,都会判断一下这个数是否还有引用,还有则继续插入缓存。

还注意到,set 方法提供了一个额外参数 force。可以强制无视当前 lru 容量,进行插入缓存数据

完整代码lualib/lru


通过 debug_consolelru 模块进行测试

设置 lru 容量是 2,插入数据 [1, 1],[2, 2],输出 [[2, 2],[1, 1]]。

在这里插入图片描述

获取数据 [1],输出 [[1, 1],[2, 2]]。

在这里插入图片描述

插入数据 [3, 3],输出 [[3, 3],[1, 1]]。

在这里插入图片描述

不做过多演示测试代码参考test/test_lru


缓存模块

一般游戏逻辑都不直接操作数据库,而是直接操作内存数据库,也称为数据缓存。游戏可以使用 redis 作为内存数据库,也可以和本项目一样实现一个缓存服务。

缓存模块库:lualib/cache.lua

local skynet = require "skynet"

local _M = {}

local cached 

function _M.call_cached(func_name, mod, sub_mod, id, ...)
    return skynet.call(cached, "lua", "run", func_name, mod, sub_mod, id, ...)
end

skynet.init(function()
    cached = skynet.uniqueservice("cached")
end)

return _M 

外接方法 call_cached


缓存服务 service/cached.lua,该服务的管理模块 module/cached/mng.lua,其余还有不同逻辑模块,例如用户模块 module/cached/user.lua 等。

service/cached.lua

local skynet = require "skynet"
local mng = require "cached.mng"
local user = require "cached.user"

local CMD = {}

function CMD.run(func_name, mod, sub_mod, id, ...)
    local func = mng.get_func(mod, sub_mod, func_name)
    local cache = mng.load_cache(mod, sub_mod, id)
    return func(id, cache, ...)
end 

function CMD.SIGHUP()
    logger.info(SERVICE_NAME, "SIGHUP to save db. Doing.")
    mng.do_save_loop()
    logger.info(SERVICE_NAME, "SIGHUP to save db. Down.")
end

skynet.start(function()
    skynet.dispatch("lua", function(_, _, cmd, ...)
        local f = assert(CMD[cmd])
        skynet.ret(skynet.pack(f(...)))
    end)
    mng.init()
    user.init()
end)

缓存服务 cached 主要提供两个接口run 用于执行远程函数SIGHUP 用于接受关服信号执行一次脏数据落盘。

还记得在日志服务中 log.lua,我们注册系统消息 PTYPE_SYSTEM

-- 捕捉sighup信号kill -l) 执行安全关服逻辑
skynet.register_protocol {
    name = "SYSTEM", 
    id = skynet.PTYPE_SYSTEM, 
    unpack = function(...) return ... end,
    dispatch = function()
        -- 执行必要服务的安全退出操作
        local cached = skynet.localname(".cached")
        if cached then 
            skynet.call(cached, "lua", "SIGHUP")
        end 

        skynet.sleep(100)
        skynet.abort()
    end 
}

外部停止服务器时,这里就执行一次关服保存数据操作通知缓存模块进行脏数据落盘。如何更好的更安全的退出 skynet,参考https://github.com/cloudwu/skynet/issues/288

服务的另一个接口run 执行远程函数,首先通过 get_func 函数接受 modsub_modfunc_name 三个参数组内部函数名称对应获取要执行的函数。在通过 load_cache加载该函数要操作对象内部先去查找缓存,缓存未命中则会从数据库加载。缓存表中数据字段_key索引,数据对象modid 构成唯一 _key


下面来看缓存的管理模块,这个模块是缓存操作核心管理了所有的缓存相关处理逻辑。

module/cached/mng.lua

local _M = {}
local CMD = {}
local cache_list    -- 缓存列表
local dirty_list    -- 脏数据列表
local load_queue    -- 数据加载队列
local mongo_col     -- 数据库操作对象
local init_cb_list = {} -- 数据加载后的初始化函数列表

-- 缓存移除回调函数
local function cache_remove_cb(key, cache)
    -- 数据脏或仍有引用,继续存入缓存
    if cache._ref > 0 or dirty_list[cache] then 
        cache_list:set(key, cache, true)
    end 
end

function _M.init()
    init_db()
    local max_cache_cnt = tonumber(skynet.getenv("cache_max_cnt")) or 10240
    local save_interval = tonumber(skynet.getenv("cache_save_interval")) or 60

    cache_list = lru.new(max_cache_cnt, cache_remove_cb)
    dirty_list = {}
    load_queue = queue()

    timer.timeout_repeat(save_interval, _M.do_save_loop)
end

先来看基础变量,和模块的初始化

创建 cache_list 对象时,指定了当前缓存结构的数据移除回调函数 cache_remove_cb,数据还有引用该数据还是脏数据 cache._ref > 0 or dirty_list[cache] ,那么重新加入缓存列表中 cache_list:set(key, cache, true)这里 lruset 方法第三个参数为 true 表示允许缓存列表临时超出上限,避免死循环执行 cache_remove_cb 回调函数。

最后,我们启动了一个定时器save_interval 时间间隔执行一次 do_save_loop 进行脏数据落盘。

-- 缓存同步到数据库
local function cache_save_db(key, cache)
    local data = {
        ['$set'] = cache
    }
    local xpcallok, updateok, err, ret = xpcall(mongo_col.safe_update, debug.traceback, mongo_col, { _key = key }, data, true, false)
    if not xpcallok or not (updateok and ret and ret.n == 1) then end 
end

-- 脏的缓存数据写到数据库
function _M.do_save_loop()
    for key, _ in pairs(dirty_list) do
        local cache = cache_list:get(key)
        if cache then 
            cache_save_db(key, cache)
        end
        dirty_list[key] = nil  
    end 
end

实际每轮保存数据就是去遍历当前的 dirty_list 脏数据列表,执行 cache_save_db 将缓存 update 到 Mongodb 数据库。

该模块是缓存管理模块,具体每个模块逻辑,都会新建相应的模块处理,并将对外提供的接口按管理模块指定方式进行注册如下述代码:


-- 注册模块执行函数
-- mod_id 组合数据库索引字段 key
local function get_key(mod, id)
    return string.format("%s_%s", mod, id) 
end 

-- mod_sub_mod_func_name 组合执行函数名
local function get_func_name(mod, sub_mod, func_name)
    return string.format("%s_%s_%s", mod, sub_mod, func_name)
end 


function _M.register_cmd(mod, sub_mod, func_list)
    for func_name, func in pairs(func_list) do 
        func_name = get_func_name(mod, sub_mod, func_name)
        CMD[func_name] = func
    end 
end 

-- 注册模块数据初始化函数
function _M.register_init_cb(mod, sub_mod, init_cb)
    if not init_cb_list[mod] then 
        init_cb_list[mod] = {}
    end 
    init_cb_list[mod][sub_mod] = init_cb
end

get_key对应缓存数据存储在数据库的 _key 字段,由 mod 和 id 拼接而成,在 init_db 中,有创建索引 mongo_col:createIndex({{_key = 1}, unique = true})

get_func_name对应管理模块中存储不同模块的对外方法,以 mod、sub_mod、func_name 拼接而成,保证了唯一性。

同时,还提供了两个注册方法,用于注册不同模块的 远程调用函数,数据初始化回调函数。

我们先来简单看一下 module/cached/user.lua 模块,理解一下这里的注册方法。

local mng = require "cached.mng"

local _M = {}
local CMD = {}


function _M.init()
    mng.register_cmd("user", "user", CMD)
    mng.register_init_cb("user", "user", init_cb)
end

return _M 

cached 服务启动时,会执行不同具体逻辑模块的 init 函数。

对于用户 user 模块,初始化时,调用两个注册方法,将自己的逻辑方法和本模块相关数据初始化回调方法,都注册到了管理模块中。

从上述我们了解到,之后封装模块进行数据逻辑处理,也是同理实现即可


下面来看管理模块如何获取远程执行函数:

-- 释放缓存
function _M.release_cache(mod, id, cache)
    local key = get_key(mod, id)
    cache._ref = cache._ref - 1
    if cache._ref < 0 then 
        logger.error(SERVICE_NAME, "cache ref wrong", "key: ", key, "ref: ", ref)
    end 
end

-- 获取执行函数
function _M.get_func(mod, sub_mod, func_name)
    func_name = get_func_name(mod, sub_mod, func_name)
    logger.debug(SERVICE_NAME, "Get func_name: ", func_name)

    local f = assert(CMD[func_name])
    return function(id, cache, ...)
        local ret = table.pack(pcall(f, id, cache, ...))
        _M.release_cache(mod, id, cache)
        return select(2, table.unpack(ret))
    end
end 

其他服务调用缓存模块(lualib/cache.lua)时,通过对外提供的 call_cached API 调用缓存服务(service/cache.lua)的 run 方法,首先执行的第一步就是 get_func,从缓存管理模块(module/cached/mng.lua)中获取对应可执行的函数,也就是这里get_func 返回闭包函数。

通过闭包的形式返回,为了保证每次执行完成后相应逻辑后,维护当前数据对象的正确引用获取到的该函数,是在加载数据 load_cache 之后执行,而 load_cache 中会改变数据对象的引用。下面来看相关代码:

-- 从数据库中加载数据
local function load_db(key, mod, sub_mod, id)
    local ret = mongo_col:findOne({ _key = key })
    if not ret then 
        local data = {
            _key = key,
        }
        local ok, err, ret = mongo_col:safe_insert(data)
        if (ok and ret and ret.n == 1) then 
            run_init_cb(mod, sub_mod, id, data)
            return key, data 
        else
            return 0, "New data error: " .. err
        end
    else
        if not ret._key then 
            return 0, "cannot load data. key: " .. key
        end 
        run_init_cb(mod, sub_mod, id, ret)
        return ret._key, ret
    end 
end

-- 从缓存中加载数据
function _M.load_cache(mod, sub_mod, id)
    local key = get_key(mod, id)
    local cache = cache_list:get(key)
    if cache then 
        cache._ref = cache._ref + 1
        dirty_list[key] = true 
        return cache
    end 

    local _key, cache = load_queue(load_db, key, mod, sub_mod, id)
    assert(_key == key)
    cache_list:set(key, cache)
    cache._ref = 1
    dirty_list[key] = true
    return cache
end 

加载数据实则是先进行缓存加载,未命中则进行数据库加载,数据库若中没有数据,则新建数据插入并返回。

每次从数据库中取出数据后,都会执行相关的数据初始化回调函数。如果是全新数据创建插入数据库,并对该数据进行初始化。如果数据是已经存在的,也会取出进行初始化。所以,在不同具体模块实现模块的数据初始化回调时,要考虑这点,而不是一味的当作新数据的初始化。例如用户模块:

local function init_cb(uid, cache)
    if not cache.username then 
        cache.username = "New Player"
    end 
    if not cache.lv then
        cache.lv = 1
    end
    if not cache.exp then
        cache.exp = 0
    end
end

这样初始化,保证了只会对不存在字段的赋值,如果数据已经有了,并不会影响

相关完整代码参考module/cached/mng.lua


数据库

客户端登录,由看门狗校验,而后登录逻辑在代理服务中执行。代理的逻辑模块中,对客户端登录的处理是先去数据库查找是否存在当前用户,不存在则进行创建,该用户账号表在数据库中,设计如下

字段 描述
uid 用户唯一ID
acc 用户账号

用户的账号信息存在 game 数据库的 account 表下。

取出当前用户信息后,还会执行用户游戏信息的加载,通过向缓存模块发起 get_userinfo 消息,获取用户的历史信息。用户的游戏内信息设计如下

字段 描述
uid 用户唯一ID
username 用户昵称
lv 用户等级
exp 用户当前经验值

用户的游戏信息存在 cache 数据库的 cached 表下。

配置文件中,mongodb_db_namecache_db_name两个配置字段可以修改上述两张表存在的数据库名。表名则没做配置,写死在了对应模块的初始化数据库代码逻辑中。


这里以用户登录注册的例子来看数据库模块的实现:

module/ws_agent/mng.lua

function _M.login(acc, fd)
    -- 数据库加载数据
    local uid = db.find_and_create_user(acc)

    local user = {
        fd = fd, 
        acc = acc,
    }
    online_users[uid] = user 
    fd2uid[fd] = uid 
	
    -- 加载玩家信息
    local userinfo = cache.call_cached("get_userinfo", "user", "user", uid)

    local res = {
        pid = "s2c_login",
        msg = "Login success",
        uid = userinfo.uid, 
        username = userinfo.username, 
        lv = userinfo.lv, 
        exp = userinfo.exp,
    }
    return res
end

登录逻辑同上述说的,这里调用db 模块,是代理对应的数据库处理模块。完整代码:module/ws_agent/mng.lua

module/ws_agent/db.lua

local _M = {}

local mongo_col -- account 表操作对象

-- game.account
function _M.init()
end 

local function call_create_new_user(acc)
    local uid = tostring(snowflake.snowflake())
    local user_data = {
        uid = uid,
        acc = acc, 
    }
    local ok, err, ret = mongo_col:safe_insert(user_data)
    if (ok and ret and ret.n == 1) then 
        return uid, user_data
    else
        return 0, "New user error: " .. err
    end 
end 

local function call_load_user(acc)
    local ret = mongo_col:findOne({acc = acc})
    if not ret then 
        return call_create_new_user(acc)
    else 
        if not ret.uid then 
            return 0, "Load user error, acc: " .. acc 
        end 
        return ret.uid, ret 
    end 
end 

local loading_user = {}
function _M.find_and_create_user(acc)
    if loading_user[acc] then 
        return 0, "already loading"
    end 
    loading_user[acc] = true 
    local ok, uid, data = xpcall(call_load_user, debug.traceback, acc)
    loading_user[acc] = nil 
    if not ok then 
        local err = uid 
        return 0, err 
    end 
    return uid, data
end 

return _M 

本模块通过 loading_user 正在加载的用户数据标识表,防止重入call_load_user 会执行数据库操作,是一个阻塞操作,同之前缓存管理模块中的 skynet.queue 性质相识。不过在这里,我们是保证执行加载数据操作,无需在同一相近时间段内多次加载,而不是用 skynet.queue 来保证这多次加载操作的时序问题

完整代码:module/ws_agent/db.lua


测试逻辑

设计获取和修改用户名协议

-- client
{
	pid = "c2s_get_username"
}

-- server
{
	pid = "s2c_get_username",
	username = "用户昵称"
}
-- client
{
	pid = "c2s_set_username",
	username = "用户昵称"
}

-- server
{
	pid = "s2c_set_username",
	msg = "是否设置成功消息"
}

客户端

test/cmds/ws.lua

function RPC.s2c_get_username(ws_id, res)
    logger.debug(SERVICE_NAME, "s2c_get_username: ", cjson.encode(res))
end 

function RPC.s2c_set_username(ws_id, res)
    logger.debug(SERVICE_NAME, "s2c_set_username: ", cjson.encode(res))
end 

function CMD.get_username(ws_id)
    local req = {
        pid = "c2s_get_username",
    }
    websocket.write(ws_id, cjson.encode(req))
end 

function CMD.set_username(ws_id, username)
    local req = {
        pid = "c2s_set_username",
        username = username,
    }
    websocket.write(ws_id, cjson.encode(req))
end 

服务端

module/ws_agent/mng.lua

-- c2s_get_username
function RPC.c2s_get_username(req, fd, uid)
    local userinfo = cache.call_cached("get_userinfo", "user", "user", uid)
    local res = {
        pid = "s2c_get_username",
        username = userinfo.username
    }
    return res 
end

-- c2s_set_username
function RPC.c2s_set_username(req, fd, uid)
    local ok = cache.call_cached("set_username", "user", "user", uid, req.username)
    local msg = "success set username: " .. req.username
    if not ok then
        msg = "failed set username"
    end 

    local res = {
        pid = "s2c_set_username",
        msg = msg,
    }
    return res 
end

module/cached/user.lua

function CMD.get_userinfo(uid, cache)
    local userinfo = {
        uid = uid, 
        username = cache.username,
        lv = cache.lv,
        exp = cache.exp,
    }
    return userinfo
end

function CMD.set_username(uid, cache, username)
    if not cache then 
        return false 
    end 
    cache.username = username
    return true 
end 

以上便是实现一条协议基本要修改的文件客户端需要添加协议对应处理方法 CMD添加网络消息接受方法 RPC服务端需要在代理模块添加网络行数据对应的协议处理函数 RPC,由于协议要从缓存获取,所以在缓存的用户模块中也要添加对应协议的处理方法 CMD

测试如下

在这里插入图片描述

在这里插入图片描述

如上述,数据成功上行到服务端并做相应逻辑处理,成功后返回给了客户端。并且数据库中的数据,也同步成功。


以上便是本章节全部内容项目源码同步https://gitee.com/Cauchy_AQ/skynet_practice

原文地址:https://blog.csdn.net/qq_52678569/article/details/134667615

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_24090.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注