博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【临实战】使用 Python 从 Redis 中删除 4000W 个 KEY
阅读量:6036 次
发布时间:2019-06-20

本文共 4249 字,大约阅读时间需要 14 分钟。

本文主要涉及 Redis 的以下两个操作和其 Python 实现,目录:
  • SCAN 命令
  • DEL 命令
  • 使用 Python SCAN
  • 使用 Python DEL
  • 成果展示

SCAN 命令

SCAN 命令及相关的 SSCAN、HSCAN 和 ZSCAN 命令都用于增量迭代(incrementally iterate)一个集合的元素(a collection of elements):
  • SCAN 用于迭代当前数据库中的数据库键
  • SSCAN 用于迭代集合键中的元素
  • HSCAN 用于迭代哈希键中的键值对
  • ZSCAN 用于迭代有序集合中的元素(包括元素分值和元素分值)
以上四列命令都支持增量迭代,每次执行都会返回少量元素,所以他们都可以用于生产环境,而不会出现像 KEYS、SMEMBERS 命令一样 -- 可能会阻塞服务器
不过,增量式迭代命令也不是没有缺点的:
举个例子,使用 SMEMBERS 命令可以返回集合键当前包含的所有元素,但是对于 SCAN 这类增量迭代命令来说,因为在堆键进行增量迭代的过程中,键可能会被改变,所以增量式迭代命令只能对被返回的元素提供有限的保证(offer limited guarantees about the returned elements)。
因为 SCAN、SSCAN、HSCAN 和 ZSCAN 命令的工作方式都非常相似,但是要记住:
  • SSCAN、HSCAN 和 ZSCAN 命令的第一个参数总是一个数据库键;
  • SCAN 命令则不需要在第一个参数提供任何数据库键 -- 因为它迭代的是当前数据库中的所有数据库键。
SCAN 命令的基本用法
SCAN 命令是一个基于游标的迭代器(cursor based iterator):
SCAN 命令每次被调用后,都会向用户返回一个新的游标,用户在下次迭代时需要使用这个新游标作为 SCAN 命令的游标参数,以此来延续之前的迭代过程。
当 SCAN 命令的游标参数被设置为 0 时,服务器开始一次新的迭代,而当服务器向用户返回值为 0 的游标时,表示迭代结束。
示例:
redis 127.0.0.1:6379> scan 01) "17"2)  1) "key:12"    2) "key:8"    3) "key:4"    4) "key:14"    5) "key:16"    6) "key:17"    7) "key:15"    8) "key:10"    9) "key:3"    10) "key:7"    11) "key:1"redis 127.0.0.1:6379> scan 171) "0"2) 1) "key:5"   2) "key:18"   3) "key:0"   4) "key:2"   5) "key:19"   6) "key:13"   7) "key:6"   8) "key:9"   9) "key:11"复制代码
上面的例子中,第一次迭代用 0 作为游标,表示开始第一次迭代。
第二次迭代使用第一次迭代时返回的游标,即:17。
从示例可以看出,SCAN 命令的返回是一个两个元素的数组,第一个元素是新游标,第二个元素也是一个数组,包含有所被包含的元素。
第二次调用 SCAN 命令时,返回游标 0,这表示迭代已经结束了,整个数据集(collection)已经被完整遍历过一遍了。
这个过程被称为一次完整遍历(full iteration)。
精简一下内容,补充三点:
  1. 因为 SCAN 命令仅仅使用游标来记录迭代状态,所以在迭代过程中,如果这个数据集的元素有增减,如果是减,不保证元素不返回;如果是增,也不保证一定返回;而且在某种情况下同一个元素还可能被返回多次。所以对迭代返回的元素所执行的操作最好可以重复执行多次(幂等)。
  2. 增量迭代命令不保证每次迭代所返回的元素数量(没扫到嘛),但是我们可以使用 COUNT 选项对命令的行为进行一定程度的调整。COUNT 参数的默认值为 10,在迭代一个足够大的、由哈希表实现的数据库、集合键、哈希键或者有序集合键时,如果用户没有使用 MATCH 选项,那么命令返回的数量通常和 COUNT 选项指定的一样,或者多一些(?),在迭代编码为整数集合(intset:一个由整数值构成的小集合)或编码为压缩列表(ziplist:由不同值构成的一个小哈希或者一个小有序集合)时,会无视 COUNT 选项指定的值,在第一次迭代就将数据集的所有元素都返回给用户。
  3. MATCH 选项,直接看示例吧,如下
示例:
redis 127.0.0.1:6379> sadd myset 1 2 3 foo foobar feelsgood(integer) 6redis 127.0.0.1:6379> sscan myset 0 match f*1) "0"2) 1) "foo"   2) "feelsgood"   3) "foobar"复制代码
注意:对元素的模式匹配工作是在命令从数据集中取出元素之后,向客户端返回元素之前进行的,所以有可能返回空
示例:
redis 127.0.0.1:6379> scan 0 MATCH *11*1) "288"2) 1) "key:911"redis 127.0.0.1:6379> scan 288 MATCH *11*1) "224"2) (empty list or set)redis 127.0.0.1:6379> scan 224 MATCH *11*1) "80"2) (empty list or set)redis 127.0.0.1:6379> scan 80 MATCH *11*1) "176"2) (empty list or set)redis 127.0.0.1:6379> scan 176 MATCH *11* COUNT 10001) "0"2)  1) "key:611"    2) "key:711"    3) "key:118"    4) "key:117"    5) "key:311"    6) "key:112"    7) "key:111"    8) "key:110"    9) "key:113"   10) "key:211"   11) "key:411"   12) "key:115"   13) "key:116"   14) "key:114"   15) "key:119"   16) "key:811"   17) "key:511"   18) "key:11"复制代码
注意:最后一次迭代,通过 COUNT 选项指定为 1000 强制命令为本次迭代扫描更多元素,从而使返回的元素也变多了。

DEL 命令

这个比较简单,删除给定的一个或者多个 key
redis> SET name "redis"OKredis> SET type "key-value store"OKredis> SET website "redis.com"OKredis> DEL name type website(integer) 3复制代码

使用 Python SCAN

安装 redis 包
pip install redis复制代码
完整代码示例:
import redispool=redis.ConnectionPool(host='redis_hostname', port=6379, max_connections=100)r = redis.StrictRedis(connection_pool=pool)cursor_number, keys = r.execute_command('scan', 0, "count", 200000)while True:    if cursor_number == 0:        # 结束一次完整的比遍历        break    cursor_number, keys = r.execute_command('scan', cursor_number, "count", 200000)    # do something with keys复制代码
我将需要删除的 key 存在一个文件里,有 2.2G,大概 4000W 个,下一步就是删除了

使用 Python DEL

因为文件很大,我们用到一个小技巧,分块读取
with open("/data/rediskeys") as kf:    lines = kf.readlines(1024*1024)复制代码
调用 delete 方法时,用到一个小技巧就是『*』星号
r.delete(*taskkey_list)复制代码
我们看一下定义就清楚了:
delete method
放上完整代码:
import redisimport timepool=redis.ConnectionPool(host='redis_hostname', port=6379, max_connections=100)r = redis.StrictRedis(connection_pool=pool)start_time = time.time()SUCCESS_DELETED = 0with open("/data/rediskeys") as kf:    while True:        lines = kf.readlines(1024*1024)        if not lines:            break        else:            taskkey_list = [i.strip() for i in lines if i.startswith("UCS:TASKKEY")]            SUCCESS_DELETED += r.delete(*taskkey_list)        print SUCCESS_DELETEDend_time = time.time()print end_time - start_time, SUCCESS_DELETED复制代码

成果展示

结束,下篇再见
· · ·

转载地址:http://tclhx.baihongyu.com/

你可能感兴趣的文章
差异分析定位Ring 3保护模块
查看>>
2013年7月12日“修复 Migration 测试发现的 Bug”
查看>>
vim文本编辑器详解
查看>>
学习vue中遇到的报错,特此记录下来
查看>>
CentOS7 编译安装 Mariadb
查看>>
32位系统和64位系统的选择
查看>>
01配置管理过程指南
查看>>
jstl格式化时间
查看>>
一则关于运算符的小例
查看>>
centos7 ambari2.6.1.5+hdp2.6.4.0 大数据集群安装部署
查看>>
cronexpression 详解
查看>>
一周小程序学习 第1天
查看>>
小孩的linux
查看>>
SpringMVC、MyBatis声明式事务管理
查看>>
开发者详解:端游及手游服务端的常用架构
查看>>
JavaScript History对象
查看>>
在 Windows 下安装 Oracle 11g XE (Express Edition)
查看>>
ListView优化
查看>>
【原创】 PostgreSQL 实现MySQL 的auto_increment 字段
查看>>
vs2015添加vc助手
查看>>