哈希表
dictht结构体定义了一个哈希表,保存着指向dictEntry的指针的数组。通过使用链表来解决哈希冲突。
dict结构体是字典的结构体,每个字典有两个哈希表,便于在rehash的时候依然可以进行遍历操作。dict使用safe值来标记当前是否是安全状态,用table值标记当前被迭代的哈希表号码。
渐进式rehash
当哈希表很大的时候,不能一下子将所有的键都哈希完成,这时候就需要让字典同时持有两个哈希表,然后用rehashidx标记哈希的进度。每次对字典进行添加、删除、查找或者更新操作的时候顺便将rehashidx上的键值rehash到ht[1]。在进行添加的时候,直接将值添加到ht[1]中,以保证不会遗漏值。
跳跃表
跳跃表是可以和平衡树相媲美的数据结构,但是结构比平衡树简单。缺点是比较占用内存。Redis虽然是单线程,但是在以后可能需要多线程,多线程情况下平衡树的再平衡会影响更多的节点,导致更激烈的锁竞争。
Redis只在有序集合和集群节点中用到了这个数据结构。
在Redis中每个节点的层数是根据幂次定律(越大的数出现的概率越小)随机生成的(1~32),然后从表头依次连接每一层,就成为一个跳跃表了。其中的跨度只是单纯为了计算排位的。
压缩列表
Redis自己实现的一种压缩型列表,entry节点之间都紧密排列,每个entry节点保存前面一个节点的内存大小、自己节点的编码和内容。
由于内容长度的不同,previous_entry_length可能是1字节也可能是5字节。
对于不同的编码类型,使用专门的代码编号指代。
在entry被修改的时候,可能导致内容长度改变进而导致previous_entry_length的长度改变,从而后续的entry。有可能导致整个表中previous_entry_length都因为无法表示而改变长度。
压缩列表在添加/删除节点的时候需要realloc内存,所以效率不是特别高。
整数集合(intset)
当集合的元素不多且都是整数值元素时,使用的是intset。
结构体包括:编码方式、元素数量与保存元素的数组这三个实例。虽然contents是int8_t类型的,但是实际上是根据encoding尺寸来读取contents内容。
对象
Redis不直接操作数据结构,而是通过对象来操作这些数据结构。在不同的场景下可以使用不同的数据结构来实现对象。
Redis对象还使用引用计数进行内存回收与对象共享。下面是编码:
// 对象编码
#define REDIS_ENCODING_RAW 0 /* Raw representation */
#define REDIS_ENCODING_INT 1 /* Encoded as integer */
#define REDIS_ENCODING_HT 2 /* Encoded as hash table */
#define REDIS_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define REDIS_ENCODING_INTSET 6 /* Encoded as intset */
#define REDIS_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
#define REDIS_ENCODING_EMBSTR 8 /* Embedded sds string encoding */
REDIS_ENCODING_RAW是简单的对象,就是将ptr指向的数据结构包装成一个对象。
/*
* 创建一个新 robj 对象
*/
robj *createObject(int type, void *ptr) {
robj *o = zmalloc(sizeof(*o));
o->type = type;
o->encoding = REDIS_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;
/* Set the LRU to the current lruclock (minutes resolution). */
o->lru = LRU_CLOCK();
return o;
}
REDIS_ENCODING_HT可以用来创建REDIS_HASH或者REDIS_SET。下面是用REDIS_ENCODING_HT的编码创建REDIS_SET的源代码,也很简单,实际上是利用了dictCreate创建一个dict基本类型,然后利用createObject方法创建一个robj,然后修改encoding为REDIS_ENCODING_HT即可:
/*
* 创建一个 SET 编码的集合对象
*/
robj *createSetObject(void) {
dict *d = dictCreate(&setDictType,NULL);
robj *o = createObject(REDIS_SET,d);
o->encoding = REDIS_ENCODING_HT;
return o;
}
字符串对象
字符串对象可以使用int、raw或者embstr编码来实现。当保存的是整数值时,用INT编码。当保存的是一个大于32字节的字符串,用RAW编码。当保存的是小于32字节的非整数值,使用EMBSTR编码。
REDIS_ENCODING_INT编码类型可以实现REDIS_STRING对象。在Redis中有共享整数池,如果需要的是这样的整数,可以直接返回共享整数。下面是分为三种情况根据整数值创建REDIS_STRING对象的方法:
/*
* 根据传入的整数值,创建一个字符串对象
*
* 这个字符串的对象保存的可以是 INT 编码的 long 值,
* 也可以是 RAW 编码的、被转换成字符串的 long long 值。
*/
robj *createStringObjectFromLongLong(long long value) {
robj *o;
// value 的大小符合 REDIS 共享整数的范围
// 那么返回一个共享对象
if (value >= 0 && value < REDIS_SHARED_INTEGERS) {
incrRefCount(shared.integers[value]);
o = shared.integers[value];
// 不符合共享范围,创建一个新的整数对象
} else {
// 值可以用 long 类型保存,
// 创建一个 REDIS_ENCODING_INT 编码的字符串对象
if (value >= LONG_MIN && value <= LONG_MAX) {
o = createObject(REDIS_STRING, NULL);
o->encoding = REDIS_ENCODING_INT;
o->ptr = (void*)((long)value);
// 值不能用 long 类型保存(long long 类型),将值转换为字符串,
// 并创建一个 REDIS_ENCODING_RAW 的字符串对象来保存值
} else {
o = createObject(REDIS_STRING,sdsfromlonglong(value));
}
}
return o;
}
另外,Redis还有一个createStringObjectFromLongDouble方法,将double类型的数字转换成char数组,并包装成EMBSTR或者RAW编码的robj(在包装成robj的过程中,还是变成了SDS类型)。
下面是创建使用EMBSTR和RAW编码创建REDIS_STRING的调用方法:
robj *createStringObject(char *ptr, size_t len) {
if (len <= REDIS_ENCODING_EMBSTR_SIZE_LIMIT)
return createEmbeddedStringObject(ptr,len);
else
return createRawStringObject(ptr,len);
}
REDIS_ENCODING_EMBSTR是用SDS实现的,下面的代码创建EmbeddedStringObject,输入的ptr是需要创建的字符串的指针。
// 创建一个 REDIS_ENCODING_EMBSTR 编码的字符对象
// 这个字符串对象中的 sds 会和字符串对象的 redisObject 结构一起分配
// 因此这个字符也是不可修改的
robj *createEmbeddedStringObject(char *ptr, size_t len) {
// 将robj和sdshdr的内存绑定在一起分配,分配好之后返回的是robj的指针。
robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr)+len+1);
// o+1是跳过一个robj结构体的长度,返回的是sdshdr的头指针。
struct sdshdr *sh = (void*)(o+1);
o->type = REDIS_STRING;
o->encoding = REDIS_ENCODING_EMBSTR;
o->ptr = sh+1;
o->refcount = 1;
o->lru = LRU_CLOCK();
sh->len = len;
sh->free = 0;
if (ptr) {
memcpy(sh->buf,ptr,len);
sh->buf[len] = '\0';
} else {
memset(sh->buf,0,len+1);
}
return o;
}
使用RAW编码创建STRING对象,ptr指向的也是SDS,不同的是现在SDS和STRING对象的内存不绑定在一起了:
/* Create a string object with encoding REDIS_ENCODING_RAW, that is a plain
* string object where o->ptr points to a proper sds string. */
// 创建一个 REDIS_ENCODING_RAW 编码的字符对象
// 对象的指针指向一个 sds 结构
robj *createRawStringObject(char *ptr, size_t len) {
return createObject(REDIS_STRING,sdsnewlen(ptr,len));
}
编码的转换
在对long double类型进行操作的时候,程序会先将字符串转换回浮点数,进行修改之后再保存到字符串中。如果操作之后的结果不是整型或者long double了,那么就只能以RAW编码来保存了。
由于EMBSTR的内存是绑定的,无法修改,所以对EMBSTR进行操作之后,对象的编码就从EMBSTR转变成RAW了(上面的long double实际上是EMBSTR类型,所以和这里的情况相同,只能改编成RAW编码)。
列表对象
列表对象的编码可以使ziplist或者linkedlist。ziplist内存紧凑,遍历迅速,但是可能会产生连锁修改的情况,修改元素也会导致内存申请,且在需要存储大量数据的时候不一定能申请到合适的内存。而linkedlist遍历速度慢,但是由于是链表,可以利用碎片内存。
对于RPUSH numbers 1 “three” 5这样的命令,看看不同编码的存储结果:在ziplist编码的时候,每个entry都得以有自己的编码,1以整型存储,“trhee"以字节数组形式存储。而linkedlist是统一编码,所以1这样的整数只能也以String的类型存在了。
列表对象默认在以下情况都满足的情况下使用ziplist,否则就使用linkedlist:
- 所有字符串长度都小于64字节
- 保存的元素数量小于512个
如果ziplist在修改的过程中不再满足以上的条件,也会被转换成linkedlist。
下面分别是使用linkedlist和ziplist创建List对象的代码:
/*
* 创建一个 LINKEDLIST 编码的列表对象
*/
robj *createListObject(void) {
list *l = listCreate();
robj *o = createObject(REDIS_LIST,l);
listSetFreeMethod(l,decrRefCountVoid);
o->encoding = REDIS_ENCODING_LINKEDLIST;
return o;
}
/*
* 创建一个 ZIPLIST 编码的列表对象
*/
robj *createZiplistObject(void) {
unsigned char *zl = ziplistNew();
robj *o = createObject(REDIS_LIST,zl);
o->encoding = REDIS_ENCODING_ZIPLIST;
return o;
}
从listTypePush方法可以看到,在插入元素之前都会尝试转换编码:
/* The function pushes an element to the specified list object 'subject',
* at head or tail position as specified by 'where'.
*
* 将给定元素添加到列表的表头或表尾。
*
* 参数 where 决定了新元素添加的位置:
*
* - REDIS_HEAD 将新元素添加到表头
*
* - REDIS_TAIL 将新元素添加到表尾
*
* There is no need for the caller to increment the refcount of 'value' as
* the function takes care of it if needed.
*
* 调用者无须担心 value 的引用计数,因为这个函数会负责这方面的工作。
*/
void listTypePush(robj *subject, robj *value, int where) {
/* Check if we need to convert the ziplist */
// 是否需要转换编码?
listTypeTryConversion(subject,value);
if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
// ZIPLIST
if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
// 取出对象的值,因为 ZIPLIST 只能保存字符串或整数
value = getDecodedObject(value);
subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
decrRefCount(value);
// 双端链表
} else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
if (where == REDIS_HEAD) {
listAddNodeHead(subject->ptr,value);
} else {
listAddNodeTail(subject->ptr,value);
}
incrRefCount(value);
// 未知编码
} else {
redisPanic("Unknown list encoding");
}
}
/* Check the argument length to see if it requires us to convert the ziplist
* to a real list. Only check raw-encoded objects because integer encoded
* objects are never too long.
*
* 对输入值 value 进行检查,看是否需要将 subject 从 ziplist 转换为双端链表,
* 以便保存值 value 。
*
* 函数只对 REDIS_ENCODING_RAW 编码的 value 进行检查,
* 因为整数编码的值不可能超长。
* 从代码逻辑可以清楚发现转换成linkedlist编码的条件
*/
void listTypeTryConversion(robj *subject, robj *value) {
// 确保 subject 为 ZIPLIST 编码
if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;
if (sdsEncodedObject(value) &&
// 看字符串是否过长
sdslen(value->ptr) > server.list_max_ziplist_value)
// 将编码转换为双端链表
listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
}
将ziplist编码转换成linkedlist编码实际就是遍历ziplist并添加到linkedlist中:
/*
* 将列表的底层编码从 ziplist 转换成双端链表
*/
void listTypeConvert(robj *subject, int enc) {
listTypeIterator *li;
listTypeEntry entry;
redisAssertWithInfo(NULL,subject,subject->type == REDIS_LIST);
// 转换成双端链表
if (enc == REDIS_ENCODING_LINKEDLIST) {
list *l = listCreate();
listSetFreeMethod(l,decrRefCountVoid);
/* listTypeGet returns a robj with incremented refcount */
// 遍历 ziplist ,并将里面的值全部添加到双端链表中
li = listTypeInitIterator(subject,0,REDIS_TAIL);
while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
listTypeReleaseIterator(li);
// 更新编码
subject->encoding = REDIS_ENCODING_LINKEDLIST;
// 释放原来的 ziplist
zfree(subject->ptr);
// 更新对象值指针
subject->ptr = l;
} else {
redisPanic("Unsupported list conversion");
}
}
哈希对象
哈希对象用ziplist或者hashtable实现。当键值对少于512并且键/值长度小于64字节的时候使用ziplist实现。hashtable的底层实现是字典。
哈希对象的ziplist实现:将键和值相邻存储在ziplist中,当需要增加键值的时候,先将键存放到ziplist尾,再将值存放到ziplist尾。当ziplist的条件不满足的时候,就会使用字典来实现哈希对象了。
下面是t_hash中set的逻辑代码,主要的逻辑都在处理ziplist的情况——首先遍历ziplist看有没有这个键,如果有就替代掉;如果没有就将键和值插入到最后。然后尝试转码成hashtable,如果不满足条件就直接转码。而hashtable实现则简单了许多,直接尝试Replace,如果替换成功则返回1,替换失败就返回0.
/* Add an element, discard the old if the key already exists.
* Return 0 on insert and 1 on update.
*
* 将给定的 field-value 对添加到 hash 中,
* 如果 field 已经存在,那么删除旧的值,并关联新值。
*
* This function will take care of incrementing the reference count of the
* retained fields and value objects.
*
* 这个函数负责对 field 和 value 参数进行引用计数自增。
*
* 返回 0 表示元素已经存在,这次函数调用执行的是更新操作。
*
* 返回 1 则表示函数执行的是新添加操作。
*/
int hashTypeSet(robj *o, robj *field, robj *value) {
int update = 0;
// 添加到 ziplist
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl, *fptr, *vptr;
// 解码成字符串或者数字
field = getDecodedObject(field);
value = getDecodedObject(value);
// 遍历整个 ziplist ,尝试查找并更新 field (如果它已经存在的话)
zl = o->ptr;
fptr = ziplistIndex(zl, ZIPLIST_HEAD);
if (fptr != NULL) {
// 定位到域 field
fptr = ziplistFind(fptr, field->ptr, sdslen(field->ptr), 1);
if (fptr != NULL) {
/* Grab pointer to the value (fptr points to the field) */
// 定位到域的值
vptr = ziplistNext(zl, fptr);
redisAssert(vptr != NULL);
// 标识这次操作为更新操作
update = 1;
/* Delete value */
// 删除旧的键值对
zl = ziplistDelete(zl, &vptr);
/* Insert new value */
// 添加新的键值对
zl = ziplistInsert(zl, vptr, value->ptr, sdslen(value->ptr));
}
}
// 如果这不是更新操作,那么这就是一个添加操作
if (!update) {
/* Push new field/value pair onto the tail of the ziplist */
// 将新的 field-value 对推入到 ziplist 的末尾
zl = ziplistPush(zl, field->ptr, sdslen(field->ptr), ZIPLIST_TAIL);
zl = ziplistPush(zl, value->ptr, sdslen(value->ptr), ZIPLIST_TAIL);
}
// 更新对象指针
o->ptr = zl;
// 释放临时对象
decrRefCount(field);
decrRefCount(value);
/* Check if the ziplist needs to be converted to a hash table */
// 检查在添加操作完成之后,是否需要将 ZIPLIST 编码转换成 HT 编码
if (hashTypeLength(o) > server.hash_max_ziplist_entries)
hashTypeConvert(o, REDIS_ENCODING_HT);
// 添加到字典
} else if (o->encoding == REDIS_ENCODING_HT) {
// 添加或替换键值对到字典
// 添加返回 1 ,替换返回 0
if (dictReplace(o->ptr, field, value)) { /* Insert */
incrRefCount(field);
} else { /* Update */
update = 1;
}
incrRefCount(value);
} else {
redisPanic("Unknown hash encoding");
}
// 更新/添加指示变量
return update;
}
集合对象
集合对象的编码可以是intset或者hashtable。
在元素都是整数且数量少于512的时候,集合对象使用intset。不满足这样的条件之后intset转为hashtable编码。
使用intset编码的时候,存储的都是整数类型。但是转换成hashtable编码的时候,元素被存储为StringObject类型的键,而值则是空,以此来保证元素的唯一性。
有序集合对象
有序集合对象的编码是ziplist或者skiplist。
在ziplist中保存内容的方式是先保存成员,再保存分值。使用ziplist的条件是:元素数量少于128个并且长度小于64字节(为什么这里就是128呢?明明也是键-值对,好在意..)。
skiplist的底层实现是zset,每个键都是StringObject对象,而值都是double对象。一个zset结构同时包括一个字典和一个跳跃表,他们使用的指针指向同一个的键、值对象,所以不会有多余的内存浪费和不一致问题。同时包括字典和跳跃表是为了提高效率:在查找某个成员的分值的时候使用字典,在进行排序、范围操作的时候使用跳跃表。
/*
* 有序集合
*/
typedef struct zset {
// 字典,键为成员,值为分值
// 用于支持 O(1) 复杂度的按成员取分值操作
dict *dict;
// 跳跃表,按分值排序成员
// 用于支持平均复杂度为 O(log N) 的按分值定位成员操作
// 以及范围操作
zskiplist *zsl;
} zset;
下面的代码是分别使用SKIPLIST和ZIPLIST编码创建有序集合的代码:
/*
* 创建一个 SKIPLIST 编码的有序集合
*/
robj *createZsetObject(void) {
// 创建ZSET
zset *zs = zmalloc(sizeof(*zs));
robj *o;
// 分别初始化dict和跳跃表
zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate();
o = createObject(REDIS_ZSET,zs);
o->encoding = REDIS_ENCODING_SKIPLIST;
return o;
}
/*
* 创建一个 ZIPLIST 编码的有序集合
*/
robj *createZsetZiplistObject(void) {
unsigned char *zl = ziplistNew();
robj *o = createObject(REDIS_ZSET,zl);
o->encoding = REDIS_ENCODING_ZIPLIST;
return o;
}