项目作者: irealing

项目描述 :
red-cache base on asyncio version
高级语言: Python
项目地址: git://github.com/irealing/red-helper.git
创建时间: 2020-05-21T15:06:32Z
项目社区:https://github.com/irealing/red-helper

开源协议:MIT License

下载


red-helper

Redis缓存工具red-cache 的 asyncio 版本.

安装

```shell script
$ pip install red-helper

  1. ## 示例
  2. ### 初始化
  3. ```python
  4. import red_helper
  5. red_helper.new("redis://redis:6379",db=0)

或:

  1. from red_helper import RedHelper
  2. from aredis import StrictRedis
  3. redis=StrictRedis(**{})
  4. helper=RedHelper(redis)

一般操作

  1. import red_helper
  2. import asyncio
  3. helper = red_helper.new("redis://redis", db=0)
  4. async def simple_operations():
  5. # 设置
  6. await helper.set("hello", "world", ex=180)
  7. # 查询字段
  8. print(await helper.get("hello", default_value="WORLD!"))
  9. # 删除字段
  10. await helper.delete("hello")
  11. if __name__ == '__main__':
  12. asyncio.get_event_loop().run_until_complete(simple_operations())

Hash

  1. import asyncio
  2. import red_helper
  3. helper = red_helper.new("redis://redis", db=0)
  4. hs = helper.red_hash("red::hash")
  5. async def simple_operations():
  6. # 设置
  7. await hs.set("hello", "world", ex=180)
  8. # 查询字段
  9. print(await hs.get("hello", default_value="WORLD!"))
  10. # 删除字段
  11. await hs.delete("hello")
  12. if __name__ == '__main__':
  13. asyncio.get_event_loop().run_until_complete(simple_operations())

缓存

  1. import asyncio
  2. import red_helper
  3. helper = red_helper.new("redis://redis", db=0)
  4. # 缓存函数返回值
  5. @helper.cache_it(lambda asset_id: "asset::cache:key:{}".format(asset_id), ttl=180)
  6. async def read_data(asset_id: int) -> dict:
  7. await asyncio.sleep(0.1)
  8. return dict(zip(range(10), range(10)))
  9. # 删除缓存
  10. @helper.remove_it(lambda asset_id: "asset::cache:key:{}".format(asset_id), by_return=True)
  11. async def update_date(asset_id: int) -> int:
  12. await asyncio.sleep(0.1)
  13. return asset_id
  14. async def main():
  15. await read_data(10)
  16. await update_date(10)
  17. if __name__ == '__main__':
  18. asyncio.get_event_loop().run_until_complete(main())

基于HASH的缓存

  1. import asyncio
  2. import red_helper
  3. helper = red_helper.new("redis://redis", db=0)
  4. hs = helper.red_hash("red::hash")
  5. # 缓存函数返回值
  6. @hs.cache_it(lambda asset_id: "asset::cache:key:{}".format(asset_id), ttl=180)
  7. async def read_data(asset_id: int) -> dict:
  8. await asyncio.sleep(0.1)
  9. return dict(zip(range(10), range(10)))
  10. # 删除缓存
  11. @hs.remove_it(lambda asset_id: "asset::cache:key:{}".format(asset_id), by_return=True)
  12. async def update_date(asset_id: int) -> int:
  13. await asyncio.sleep(0.1)
  14. return asset_id
  15. async def main():
  16. await read_data(10)
  17. await update_date(10)
  18. if __name__ == '__main__':
  19. asyncio.get_event_loop().run_until_complete(main())

author:Memory_Leak