3.4.4 Redis缓存实现

现在,我们已经准备好使用与之前 DiskCache 类相同的类接口构建Redis缓存了。

import json
from datetime import timedelta
from redis import StrictRedis
class RedisCache:
    def __init__(self, client=None, expires=timedelta(days=30),
        # if a client object is not passed then try
        # connecting to redis at the default localhost port
        self.client = StrictRedis(host='localhost', port=6379, db=0)
            if client is None else client
        self.expires = expires
        self.encoding = encoding
    def __getitem__(self, url):
        """Load value from Redis for the given URL"""
        record = self.client.get(url)
        if record:
            return json.loads(record.decode(self.encoding))
            raise KeyError(url + ' does not exist')
    def __setitem__(self, url, result):
        """Save value in Redis for the given URL"""
        data = bytes(json.dumps(result), self.encoding)
        self.client.setex(url, self.expires, data)

这里的 __getitem____setitem__ 方法与前一节中关于如何在Redis中获取及设置键的讨论很相似,不过在这里我们使用了 json 模块控制序列化,并使用了 setex 方法,能够使我们在设置键值时附带过期时间。 setex 既可以接受 datetime.timedelta ,也可以接受以秒为单位的数值。这是一个非常方便的Redis功能,可以在指定秒数后自动删除记录。这就意味着我们不再需要像 DiskCache 类那样手工检查记录是否在我们的过期规则内。让我们使用20秒的时间差在IPython中进行尝试,观察缓存过期。

In [1]: from chp3.rediscache import RedisCache
In [2]: from datetime import timedelta
In [3]: cache = RedisCache(expires=timedelta(seconds=20))
In [4]: cache['test'] = {'html': '...', 'code': 200}
In [5]: cache['test']
Out[5]: {'code': 200, 'html': '...'}
In [6]: import time; time.sleep(20)
In [7]: cache['test']
KeyError Traceback (most recent call last)
KeyError: 'test does not exist'
