最近在看OSS存储发现13年做的一些网站存储数据,可惜网站这些都已经被删除了,一直保留了李金龙这个网站一直使用着,看着每天消耗的金额,想着要么就直接删除了,但是最恶心的是,删除需要10条10条的删除,想着几十万的数据,要删除到啥时候,就写了下面的代码。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 七牛云存储空间删除工具 先删除空间中的所有内容,然后删除空间本身 """ import sys from qiniu import Auth, BucketManager, build_batch_delete from qiniu.config import set_default # 尝试从配置文件导入凭证 try: from config import ACCESS_KEY, SECRET_KEY, DEFAULT_REGION except ImportError: # 如果配置文件不存在,使用默认值(需要用户输入) ACCESS_KEY = None SECRET_KEY = None DEFAULT_REGION = 'z0' def delete_bucket_contents(access_key, secret_key, bucket_name, region='z0'): """ 删除存储空间中的所有内容 Args: access_key: 七牛云 AccessKey secret_key: 七牛云 SecretKey bucket_name: 存储空间名称 region: 存储区域,默认为 z0(华东) Returns: bool: 是否成功删除所有内容 """ # 初始化认证对象 q = Auth(access_key, secret_key) # 初始化Bucket管理对象 bucket = BucketManager(q) # 设置区域 set_default(default_zone=region) print(f"开始列举存储空间 '{bucket_name}' 中的所有对象...") # 列举所有对象 marker = None total_count = 0 deleted_count = 0 while True: ret, eof, info = bucket.list(bucket_name, limit=1000, marker=marker) if ret is None: print(f"列举对象失败: {info}") return False items = ret.get('items', []) # 如果没有对象了,退出循环 if len(items) == 0: break total_count += len(items) # 批量删除对象(每次最多1000个) keys = [item['key'] for item in items] print(f"正在删除 {len(keys)} 个对象...") # 构建批量删除操作 ops = build_batch_delete(bucket_name, keys) batch_ret, batch_info = bucket.batch(ops) if batch_ret is None: print(f"批量删除失败: {batch_info}") return False # 统计删除成功的数量 # 注意:612错误表示文件不存在,这不算失败(可能已经被删除过了) for item in batch_ret: code = item.get('code') if code == 200: deleted_count += 1 elif code == 612: # 文件不存在,也算作已删除(可能之前已经删除过) deleted_count += 1 else: # 其他错误才打印 error_msg = item.get('data', {}).get('error', item.get('error', '未知错误')) print(f"删除对象失败 (code {code}): {error_msg}") print(f"已删除 {deleted_count}/{total_count} 个对象") # 如果已经列举完所有对象,退出循环 if eof: break # 更新marker继续列举 marker = ret.get('marker') print(f"\n所有内容删除完成!共删除 {deleted_count} 个对象") return True def delete_bucket(access_key, secret_key, bucket_name, region='z0'): """ 删除存储空间 Args: access_key: 七牛云 AccessKey secret_key: 七牛云 SecretKey bucket_name: 存储空间名称 region: 存储区域,默认为 z0(华东) Returns: bool: 是否成功删除空间 """ # 初始化认证对象 q = Auth(access_key, secret_key) # 初始化Bucket管理对象 bucket = BucketManager(q) # 设置区域 set_default(default_zone=region) print(f"\n正在删除存储空间 '{bucket_name}'...") # 使用 drop API 删除存储空间 # 根据七牛云API文档,drop API的端点是 POST /drop/<bucket> try: # 使用BucketManager的drop方法(如果存在) if hasattr(bucket, 'drop'): ret, info = bucket.drop(bucket_name) else: # 手动调用 drop API from qiniu import http # drop API的URL格式: /drop/<bucket> entry = f'{bucket_name}' url = f'http://rs.qiniu.com/drop/{entry}' # 生成管理凭证 token = q.token_of_request(url) ret, info = http._post_with_token(url, None, token) # 检查返回结果 if ret is not None: # 如果返回了结果,检查是否有错误 if isinstance(ret, dict) and ret.get('error'): print(f"删除存储空间失败: {ret.get('error')}") return False else: print(f"存储空间 '{bucket_name}' 删除成功!") return True elif hasattr(info, 'status_code') and info.status_code == 200: print(f"存储空间 '{bucket_name}' 删除成功!") return True else: error_msg = info.text_body if hasattr(info, 'text_body') else str(info) print(f"删除存储空间失败: {error_msg}") return False except Exception as e: print(f"删除存储空间失败: {e}") import traceback traceback.print_exc() return False def main(): """主函数""" print("=" * 60) print("七牛云存储空间删除工具") print("=" * 60) # 获取用户输入(如果配置文件中已有凭证,则使用配置文件中的) if ACCESS_KEY and SECRET_KEY: print(f"使用配置文件中的凭证") access_key = ACCESS_KEY secret_key = SECRET_KEY else: access_key = input("请输入 AccessKey: ").strip() secret_key = input("请输入 SecretKey: ").strip() bucket_name = input("请输入要删除的存储空间名称: ").strip() # 区域选择 print("\n存储区域选择:") print("z0 - 华东") print("z1 - 华北") print("z2 - 华南") print("na0 - 北美") print("as0 - 东南亚") region = input(f"请输入存储区域 (默认 {DEFAULT_REGION}): ").strip() or DEFAULT_REGION # 确认操作 print(f"\n警告: 即将删除存储空间 '{bucket_name}' 中的所有内容,然后删除空间本身!") confirm = input("确认继续?(输入 'yes' 确认): ").strip() if confirm.lower() != 'yes': print("操作已取消") return # 先删除空间中的所有内容 if not delete_bucket_contents(access_key, secret_key, bucket_name, region): print("\n删除空间内容失败,操作终止") return # 再删除空间本身 if not delete_bucket(access_key, secret_key, bucket_name, region): print("\n删除存储空间失败") return print("\n" + "=" * 60) print("操作完成!") print("=" * 60) if __name__ == '__main__': try: main() except KeyboardInterrupt: print("\n\n操作被用户中断") sys.exit(1) except Exception as e: print(f"\n发生错误: {e}") sys.exit(1)
除了上面的代码还需要电脑本地搭建python环境,以及安装qiniu>=7.11.0 这个包。
