七牛云存储空间删除工具python版

李金龙
李金龙
管理员
496
文章
0
粉丝
python工具评论13字数 1350阅读模式

最近在看OSS存储发现13年做的一些网站存储数据,可惜网站这些都已经被删除了,一直保留了李金龙这个网站一直使用着,看着每天消耗的金额,想着要么就直接删除了,但是最恶心的是,删除需要10条10条的删除,想着几十万的数据,要删除到啥时候,就写了下面的代码。七牛云存储空间删除工具python版

 

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
七牛云存储空间删除工具
先删除空间中的所有内容,然后删除空间本身
"""

import sys
from qiniu import Auth, BucketManager, build_batch_delete
from qiniu.config import set_default

# 尝试从配置文件导入凭证
try:
    from config import ACCESS_KEY, SECRET_KEY, DEFAULT_REGION
except ImportError:
    # 如果配置文件不存在,使用默认值(需要用户输入)
    ACCESS_KEY = None
    SECRET_KEY = None
    DEFAULT_REGION = 'z0'


def delete_bucket_contents(access_key, secret_key, bucket_name, region='z0'):
    """
    删除存储空间中的所有内容
    
    Args:
        access_key: 七牛云 AccessKey
        secret_key: 七牛云 SecretKey
        bucket_name: 存储空间名称
        region: 存储区域,默认为 z0(华东)
    
    Returns:
        bool: 是否成功删除所有内容
    """
    # 初始化认证对象
    q = Auth(access_key, secret_key)
    
    # 初始化Bucket管理对象
    bucket = BucketManager(q)
    
    # 设置区域
    set_default(default_zone=region)
    
    print(f"开始列举存储空间 '{bucket_name}' 中的所有对象...")
    
    # 列举所有对象
    marker = None
    total_count = 0
    deleted_count = 0
    
    while True:
        ret, eof, info = bucket.list(bucket_name, limit=1000, marker=marker)
        
        if ret is None:
            print(f"列举对象失败: {info}")
            return False
        
        items = ret.get('items', [])
        
        # 如果没有对象了,退出循环
        if len(items) == 0:
            break
        
        total_count += len(items)
        
        # 批量删除对象(每次最多1000个)
        keys = [item['key'] for item in items]
        
        print(f"正在删除 {len(keys)} 个对象...")
        
        # 构建批量删除操作
        ops = build_batch_delete(bucket_name, keys)
        batch_ret, batch_info = bucket.batch(ops)
        
        if batch_ret is None:
            print(f"批量删除失败: {batch_info}")
            return False
        
        # 统计删除成功的数量
        # 注意:612错误表示文件不存在,这不算失败(可能已经被删除过了)
        for item in batch_ret:
            code = item.get('code')
            if code == 200:
                deleted_count += 1
            elif code == 612:
                # 文件不存在,也算作已删除(可能之前已经删除过)
                deleted_count += 1
            else:
                # 其他错误才打印
                error_msg = item.get('data', {}).get('error', item.get('error', '未知错误'))
                print(f"删除对象失败 (code {code}): {error_msg}")
        
        print(f"已删除 {deleted_count}/{total_count} 个对象")
        
        # 如果已经列举完所有对象,退出循环
        if eof:
            break
        
        # 更新marker继续列举
        marker = ret.get('marker')
    
    print(f"\n所有内容删除完成!共删除 {deleted_count} 个对象")
    return True


def delete_bucket(access_key, secret_key, bucket_name, region='z0'):
    """
    删除存储空间
    
    Args:
        access_key: 七牛云 AccessKey
        secret_key: 七牛云 SecretKey
        bucket_name: 存储空间名称
        region: 存储区域,默认为 z0(华东)
    
    Returns:
        bool: 是否成功删除空间
    """
    # 初始化认证对象
    q = Auth(access_key, secret_key)
    
    # 初始化Bucket管理对象
    bucket = BucketManager(q)
    
    # 设置区域
    set_default(default_zone=region)
    
    print(f"\n正在删除存储空间 '{bucket_name}'...")
    
    # 使用 drop API 删除存储空间
    # 根据七牛云API文档,drop API的端点是 POST /drop/<bucket>
    try:
        # 使用BucketManager的drop方法(如果存在)
        if hasattr(bucket, 'drop'):
            ret, info = bucket.drop(bucket_name)
        else:
            # 手动调用 drop API
            from qiniu import http
            # drop API的URL格式: /drop/<bucket>
            entry = f'{bucket_name}'
            url = f'http://rs.qiniu.com/drop/{entry}'
            # 生成管理凭证
            token = q.token_of_request(url)
            ret, info = http._post_with_token(url, None, token)
        
        # 检查返回结果
        if ret is not None:
            # 如果返回了结果,检查是否有错误
            if isinstance(ret, dict) and ret.get('error'):
                print(f"删除存储空间失败: {ret.get('error')}")
                return False
            else:
                print(f"存储空间 '{bucket_name}' 删除成功!")
                return True
        elif hasattr(info, 'status_code') and info.status_code == 200:
            print(f"存储空间 '{bucket_name}' 删除成功!")
            return True
        else:
            error_msg = info.text_body if hasattr(info, 'text_body') else str(info)
            print(f"删除存储空间失败: {error_msg}")
            return False
    except Exception as e:
        print(f"删除存储空间失败: {e}")
        import traceback
        traceback.print_exc()
        return False


def main():
    """主函数"""
    print("=" * 60)
    print("七牛云存储空间删除工具")
    print("=" * 60)
    
    # 获取用户输入(如果配置文件中已有凭证,则使用配置文件中的)
    if ACCESS_KEY and SECRET_KEY:
        print(f"使用配置文件中的凭证")
        access_key = ACCESS_KEY
        secret_key = SECRET_KEY
    else:
        access_key = input("请输入 AccessKey: ").strip()
        secret_key = input("请输入 SecretKey: ").strip()
    
    bucket_name = input("请输入要删除的存储空间名称: ").strip()
    
    # 区域选择
    print("\n存储区域选择:")
    print("z0 - 华东")
    print("z1 - 华北")
    print("z2 - 华南")
    print("na0 - 北美")
    print("as0 - 东南亚")
    region = input(f"请输入存储区域 (默认 {DEFAULT_REGION}): ").strip() or DEFAULT_REGION
    
    # 确认操作
    print(f"\n警告: 即将删除存储空间 '{bucket_name}' 中的所有内容,然后删除空间本身!")
    confirm = input("确认继续?(输入 'yes' 确认): ").strip()
    
    if confirm.lower() != 'yes':
        print("操作已取消")
        return
    
    # 先删除空间中的所有内容
    if not delete_bucket_contents(access_key, secret_key, bucket_name, region):
        print("\n删除空间内容失败,操作终止")
        return
    
    # 再删除空间本身
    if not delete_bucket(access_key, secret_key, bucket_name, region):
        print("\n删除存储空间失败")
        return
    
    print("\n" + "=" * 60)
    print("操作完成!")
    print("=" * 60)


if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print("\n\n操作被用户中断")
        sys.exit(1)
    except Exception as e:
        print(f"\n发生错误: {e}")
        sys.exit(1)

除了上面的代码还需要电脑本地搭建python环境,以及安装qiniu>=7.11.0 这个包。

 
李金龙
  • 本文由 李金龙 发表于2025年11月12日 13:57:41
  • 转载请务必保留本文链接:https://www.lijinlong.cc/python/tool/4424.html
python工具

python小工具,批量监控网站状态

前面为铺垫行内容,当手持大量的网站的时候,发现网站的管理是个很大的问题,特别还是面向dedecms这样的程序做的网站,那真的是天天刺激。 所以就在撸爬虫的时候就顺手写了个这样的小工具,提高日常的工作效...
匿名

发表评论

匿名网友
确定

拖动滑块以完成验证