4.5 散列

我们之前研究了Nginx数组和链表的实现,这两种数据结构一种插入消耗大,一种查找效率低。散列是一种可以在O(1)的时间复杂度下快速查找的数据结构,它是以空间换时间的数据结构。我们可将散列的key通过一定的方法映射到散列表中的某一位置。当然,散列可能会出现key冲突。解决冲突的两种方法是拉链法和开放地址法。

拉链法解决散列冲突的原理是,当key的散列值出现冲突时,将相同的散列值组成一个链表。当查找这些冲突的数据时,通过遍历这个链表找到相应的数据。拉链法一般用头插法来实现,这里不再展开描述。

Nginx采用开放地址法解决散列冲突。当key的散列值出现冲突时,则向后遍历,查看散列值+1的位置是否有值存在,如果无值则会占用这个位置,如果有值则继续向后遍历。在查找数据时,如果遇到的散列值不是想查找的数据,则向后遍历,直到找到相应的数据。

Nginx散列相关的数据结构如下:

typedef struct {
    void             *value;
    u_short           len;
    u_char            name[1];
} ngx_hash_elt_t;

代码中的参数说明如下。

  • value:指向散列value的值。
  • len:散列key的长度。
  • name:柔性数组,散列key的值。

ngx_hash_elt_t是散列的结构体,其中散列的key是结构体的name字段,散列的value是结构体的value字段。该结构体如下:

typedef struct {
    ngx_hash_elt_t  **buckets;
    ngx_uint_t        size;
} ngx_hash_t;

代码中的参数说明如下。

  • buckets:桶数组,数组中每个数据的类型为ngx_hash_elt_t*,buckets指向当前桶的第一个散列数据。
  • size:散列桶的数量。

所有的散列数据连续存储在一个字节数组buckets中。当散列冲突时,一个散列位置需要存储多个散列数据,这时候如何处理呢?我们观察ngx_hash_elt_t结构体,很容易知道数据的长度,所以多个散列数据在冲突位置按顺序存储即可。为了使用方便,每个桶的最后都有一个8字节的NULL。数据在散列中的存储示例如图4-6所示。

图4-6 数据在散列中的存储示例

ngx_hash_init_t结构用于提供创建散列所需要的信息:

typedef struct {
    ngx_hash_t       *hash;
    ngx_hash_key_pt   key;
    ngx_uint_t        max_size;
    ngx_uint_t        bucket_size;
    char             *name;
    ngx_pool_t       *pool;
    ngx_pool_t       *temp_pool;
} ngx_hash_init_t;

代码中的参数说明如下。

  • hash:指向散列。
  • key:散列方法,它是一个函数指针。
  • max_size:散列表中桶的最大数量。
  • bucket_size:每个桶的最大字节大小。
  • name:散列的名称。
  • pool:散列所在的内存池。
  • temp_tool:构建散列时所用的临时内存池,创建散列完成时被回收。

Nginx用ngx_hash_init函数来初始化散列,函数定义如下:

ngx_int_t ngx_hash_init(ngx_hash_init_t *hinit, ngx_hash_key_t *names, ngx_uint_t nelts);

Nginx的散列在初始化完成之后便不会再修改,只能用于查询操作。因此创建散列时就必须预先计算好合适的桶数目、每个桶的字节大小以及每个桶存储哪些数据等。在初始化散列之前,我们需要一个ngx_hash_init_t结构体指针变量hinit。hinit变量已经将结构体各变量初始化,而ngx_hash_init函数的作用就是将nelts个存储在names里的key-value添加到hinit变量的哈希表里。

由于散列的每个桶的最后都有一个8字节的NULL,因此在初始化散列的时候,需要判断每个数据加8字节是否大于散列的bucket_size,如果大于,则表示散列无法存下数据。这是异常情况,不能继续初始化,代码如下:

for (n = 0; n < nelts; n++) {
    if (hinit->bucket_size < NGX_HASH_ELT_SIZE(&names[n]) + sizeof(void *))
    {
        ...
        return NGX_ERROR;
    }
}

其中,NGX_HASH_ELT_SIZE用于计算散列数据的大小,代码如下:

#define NGX_HASH_ELT_SIZE(name)                            \
    (sizeof(void *) + ngx_align((name)->key.len + 2, sizeof(void *)))

由ngx_hash_elt_t的定义我们知道:它有一个void*元素,指向数据的实际值;它的name字段是一个柔性数组,柔性数组的长度在赋值时才会知道;它的长度用u_short表示,长度为2。为了内存对齐操作方便,通过ngx_align使数据为8字节的整数倍,这样就求出了一个数据的大小。

在判断完所有数据的大小都满足要求后,需要计算合适的桶数目,桶数目已知后,便可以计算出每个散列数据的桶索引。计算合适的桶数目需要遍历,首先会粗略计算出一个最小桶数目start、最大桶数目max_size,在此范围内再精确计算桶最终数目。每个散列数据最小为16字节,因此可按如下方式计算最小桶数目(每个桶最后都有8字节的NULL表示桶结束):

bucket_size = hinit->bucket_size - sizeof(void *);
start = nelts / (bucket_size / (2 * sizeof(void *)));
start = start ? start : 1;

这里还需要统计每个桶存储的散列数据长度之和,因此可以先分配一个test数组:

test = ngx_alloc(hinit->max_size * sizeof(u_short), hinit->pool->log);

这里先用test数组来实现桶数目的精确计算,每个数据都应该存放在散列中,且每个桶的大小不能大于bucket_size。当所有条件都满足后,得到的size值即为最终确定的精确桶数目。

for (size = start; size <= hinit->max_size; size++) {
    ngx_memzero(test, size * sizeof(u_short));
    for (n = 0; n < nelts; n++) {
        ...
        key = names[n].key_hash % size;
        test[key] = (u_short) (test[key] + NGX_HASH_ELT_SIZE(&names[n]));
        if (test[key] > (u_short) bucket_size) {
            goto next;
        }
    }
    goto found;
next:
    continue;
}

求出size之后便可以计算出散列的总长度。

for (i = 0; i < size; i++) {
    test[i] = sizeof(void *);// 每个桶都有一个8字节的NULL指针
}
for (n = 0; n < nelts; n++) {
    ...
    key = names[n].key_hash % size;
    test[key] = (u_short) (test[key] + NGX_HASH_ELT_SIZE(&names[n]));
}

桶的数目和散列的总长度确定好之后,就可以在内存池上分配存储空间了。注意,这里在分配空间时是按照ngx_cacheline_size结构体进行字节对齐的。CPU在加载内存中数据到高速缓存时是一次性加载一个数据块。数据块的大小为cacheline_size,通常为64字节。分配内存时,按照ngx_cacheline_size结构体字节对齐的方式加载到高速缓存的效率较高。

for (i = 0; i < size; i++) {
    ...
    test[i] = (u_short) (ngx_align(test[i], ngx_cacheline_size));
    ...
}
elts = ngx_palloc(hinit->pool, len + ngx_cacheline_size);

前面已经计算出每个桶所有数据总长度,因此这里可以将桶指针指向每个桶第一个数据首地址:

for (i = 0; i < size; i++) {
    ...
    buckets[i] = (ngx_hash_elt_t *) elts;
    elts += test[i];
}

以上所有准备工作就绪之后,我们就可以将散列数据存放到桶中相应的位置了:

for (n = 0; n < nelts; n++) {
    ...
    key = names[n].key_hash % size;
    elt = (ngx_hash_elt_t *) ((u_char *) buckets[key] + test[key]);
    elt->value = names[n].value;
    elt->len = (u_short) names[n].key.len;
    ngx_strlow(elt->name, names[n].key.data, names[n].key.len);
    test[key] = (u_short) (test[key] + NGX_HASH_ELT_SIZE(&names[n]));
}

当然,每个桶最后会添加8字节的NULL作为结束标志:

for (i = 0; i < size; i++) {
    ...
    elt = (ngx_hash_elt_t *) ((u_char *) buckets[i] + test[i]);
    elt->value = NULL;
}

将确定好的桶数量和长度赋值给散列数据结构中的变量:

hinit->hash->buckets = buckets;
hinit->hash->size = size;

通过以上步骤,我们便完成了对散列桶数目的确定并将每个数据添加到桶的相应位置。

Nginx散列是通过开放地址法来解决散列冲突的,所以在查找数据时,首先需要确定好数据所在的桶位置。

elt = hash->buckets[key % hash->size];

知道了桶的位置后,便可以通过遍历桶上的所有数据来找到相应的数据,这比较简单,不再赘述。