Redis内存数据库示例分析
上后左爱 人气:0redies dict字典
这是 Redis 最底层的结构,比如 1个DB 下面有 16个Dict
1. 使用接口方式, 基础实现是simple_dict ,sync_dict ,后续用户可以根据自己的需求进行自定义的实现属于自己的Dict , 在 forEach 方法 支持匿名函数方式 type Dict interface { Get(key string) (val interface {}, exists bool) Len() // 回复上次 put 放入 Put(key string, val interface{}) (result int) PutIfAbsent(key string, val interface {}) (result int) PutIfExists(key string, val interface{}) (result int) Remove(key string) (result int) ForEach(consumer Consumer) // 传入是方法, 返回 bool = true j继续遍历 Keys() []string RandomKeys(limit int) []string // 返回多个不重复的键 RandomDistinctKeys(limit int) []string Clear() } // Consumer is used to traversal dict, if it returns false the traversal will be break type Consumer func(key string, val interface{}) bool
package dict // SimpleDict wraps a map, it is not thread safe type SimpleDict struct { m map[string]interface{} } // MakeSimple makes a new map func MakeSimple() *SimpleDict { return &SimpleDict{ m: make(map[string]interface{}), } } // Get returns the binding value and whether the key is exist func (dict *SimpleDict) Get(key string) (val interface{}, exists bool) { val, ok := dict.m[key] return val, ok } // Len returns the number of dict func (dict *SimpleDict) Len() int { if dict.m == nil { panic("m is nil") } return len(dict.m) } // Put puts key value into dict and returns the number of new inserted key-value func (dict *SimpleDict) Put(key string, val interface{}) (result int) { _, existed := dict.m[key] dict.m[key] = val if existed { return 0 } return 1 } // PutIfAbsent puts value if the key is not exists and returns the number of updated key-value func (dict *SimpleDict) PutIfAbsent(key string, val interface{}) (result int) { _, existed := dict.m[key] if existed { return 0 } dict.m[key] = val return 1 } // PutIfExists puts value if the key is exist and returns the number of inserted key-value func (dict *SimpleDict) PutIfExists(key string, val interface{}) (result int) { _, existed := dict.m[key] if existed { dict.m[key] = val return 1 } return 0 } // Remove removes the key and return the number of deleted key-value func (dict *SimpleDict) Remove(key string) (result int) { _, existed := dict.m[key] delete(dict.m, key) if existed { return 1 } return 0 } // Keys returns all keys in dict func (dict *SimpleDict) Keys() []string { result := make([]string, len(dict.m)) i := 0 for k := range dict.m { result[i] = k } return result } // ForEach traversal the dict func (dict *SimpleDict) ForEach(consumer Consumer) { for k, v := range dict.m { if !consumer(k, v) { break } } } // RandomKeys randomly returns keys of the given number, may contain duplicated key func (dict *SimpleDict) RandomKeys(limit int) []string { result := make([]string, limit) for i := 0; i < limit; i++ { for k := range dict.m { result[i] = k break } } return result } // RandomDistinctKeys randomly returns keys of the given number, won't contain duplicated key func (dict *SimpleDict) RandomDistinctKeys(limit int) []string { size := limit if size > len(dict.m) { size = len(dict.m) } result := make([]string, size) i := 0 for k := range dict.m { if i == limit { break } result[i] = k i++ } return result } // Clear removes all keys in dict func (dict *SimpleDict) Clear() { *dict = *MakeSimple() }
Redis的DB实现
针对其中 Command 的实现, ping, set ,Get 有具体的实现方法, 采用策略模式形式, 不同的方法 有自己的 Executor 执行器
// 指令与结构体之间的关系 var cmdTable = make(map[string]*command) type command struct { executor ExecFunc arity int // allow number of args, arity < 0 means len(args) >= -arity } // RegisterCommand registers a new command // arity means allowed number of cmdArgs, arity < 0 means len(args) >= -arity. // for example: the arity of `get` is 2, `mget` is -2 func RegisterCommand(name string, executor ExecFunc, arity int) { name = strings.ToLower(name) cmdTable[name] = &command{ executor: executor, arity: arity, } }
// Package database is a memory database with redis compatible interface package database import ( "go-redis/datastruct/dict" "go-redis/interface/database" "go-redis/interface/resp" "go-redis/resp/reply" "strings" ) // DB stores data and execute user's commands type DB struct { index int // key -> DataEntity data dict.Dict // 底层的实现可以进行切换 } // ExecFunc is interface for command executor, redis 的 指令实现, 所有的实现 协程这种形式 // args don't include cmd line type ExecFunc func(db *DB, args [][]byte) resp.Reply // CmdLine is alias for [][]byte, represents a command line type CmdLine = [][]byte // makeDB create DB instance func makeDB() *DB { db := &DB{ data: dict.MakeSyncDict(), // 使用的 sync 的 dict } return db } // Exec executes command within one database func (db *DB) Exec(c resp.Connection, cmdLine [][]byte) resp.Reply { cmdName := strings.ToLower(string(cmdLine[0])) cmd, ok := cmdTable[cmdName] if !ok { return reply.MakeErrReply("ERR unknown command '" + cmdName + "'") } if !validateArity(cmd.arity, cmdLine) { return reply.MakeArgNumErrReply(cmdName) } fun := cmd.executor return fun(db, cmdLine[1:]) } func validateArity(arity int, cmdArgs [][]byte) bool { argNum := len(cmdArgs) if arity >= 0 { return argNum == arity } return argNum >= -arity } /* ---- data Access ----- */ // GetEntity returns DataEntity bind to given key func (db *DB) GetEntity(key string) (*database.DataEntity, bool) { raw, ok := db.data.Get(key) if !ok { return nil, false } entity, _ := raw.(*database.DataEntity) return entity, true } // PutEntity a DataEntity into DB func (db *DB) PutEntity(key string, entity *database.DataEntity) int { return db.data.Put(key, entity) } // PutIfExists edit an existing DataEntity func (db *DB) PutIfExists(key string, entity *database.DataEntity) int { return db.data.PutIfExists(key, entity) } // PutIfAbsent insert an DataEntity only if the key not exists func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int { return db.data.PutIfAbsent(key, entity) } // Remove the given key from db func (db *DB) Remove(key string) { db.data.Remove(key) } // Removes the given keys from db func (db *DB) Removes(keys ...string) (deleted int) { deleted = 0 for _, key := range keys { _, exists := db.data.Get(key) if exists { db.Remove(key) deleted++ } } return deleted } // Flush clean database func (db *DB) Flush() { db.data.Clear() }
具体的实现器
// Ping Executor func Ping(db *DB, args [][]byte) resp.Reply { if len(args) == 0 { return &reply.PongReply{} } else if len(args) == 1 { return reply.MakeStatusReply(string(args[0])) } else { return reply.MakeErrReply("ERR wrong number of arguments for 'ping' command") } } // 初始化 方法 func init() { RegisterCommand("ping", Ping, -1) }
Redis持久化Aof
AOF 则以协议文本的方式,将所有对数据库进行过写入的命令(及其参数)记录到 AOF 文件,以此达到记录数据库状态的目的.
package aof import ( "go-redis/config" databaseface "go-redis/interface/database" "go-redis/lib/logger" "go-redis/lib/utils" "go-redis/resp/connection" "go-redis/resp/parser" "go-redis/resp/reply" "io" "os" "strconv" ) // CmdLine is alias for [][]byte, represents a command line type CmdLine = [][]byte const ( aofQueueSize = 1 << 16 ) type payload struct { cmdLine CmdLine dbIndex int } // AofHandler receive msgs from channel and write to AOF file type AofHandler struct { db databaseface.Database aofChan chan *payload aofFile *os.File aofFilename string currentDB int } // NewAOFHandler creates a new aof.AofHandler func NewAOFHandler(db databaseface.Database) (*AofHandler, error) { handler := &AofHandler{} handler.aofFilename = config.Properties.AppendFilename handler.db = db // init Load Aof to Memory handler.LoadAof() aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { return nil, err } handler.aofFile = aofFile handler.aofChan = make(chan *payload, aofQueueSize) go func() { handler.handleAof() }() return handler, nil } func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) { if config.Properties.AppendOnly && handler.aofChan != nil { handler.aofChan <- &payload{ cmdLine: cmdLine, dbIndex: dbIndex, } } } // LoadAof read aof file 数据回放的功能 func (handler *AofHandler) LoadAof() { file, err := os.Open(handler.aofFilename) if err != nil { logger.Warn(err) return } defer func(file *os.File) { err := file.Close() if err != nil { } }(file) ch := parser.ParseStream(file) fakeConn := &connection.Connection{} // only used for save dbIndex // LoadAof recover data for p := range ch { if p.Err != nil { if p.Err == io.EOF { break } logger.Error("parse error: " + p.Err.Error()) continue } if p.Data == nil { logger.Error("empty payload") continue } r, ok := p.Data.(*reply.MultiBulkReply) if !ok { logger.Error("require multi bulk reply") continue } ret := handler.db.Exec(fakeConn, r.Args) if reply.IsErrorReply(ret) { logger.Error("exec err", err) } } } // handleAof listen aof channel and write into file aof 异步形式的落盘 func (handler *AofHandler) handleAof() { handler.currentDB = 0 for p := range handler.aofChan { if p.dbIndex != handler.currentDB { // select db data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes() _, err := handler.aofFile.Write(data) if err != nil { logger.Warn(err) continue // skip this command } handler.currentDB = p.dbIndex } data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes() _, err := handler.aofFile.Write(data) if err != nil { logger.Warn(err) } } }
加载全部内容