Python-boto3接口

import boto3
import time
from botocore.client import Config
from botocore.exceptions import ClientError

# 用户的access_key和secret_key可以通过命令行工具udbto-cli或者通过api访问master节点获取
# ./udbto-cli user info testuser
# curl -v "http://192.168.2.190:15010/user/info?user=testuser"
CONFIG = {
    'endpoint': 'http://192.168.2.190:15410',   
    'access_key': 'j1IbHJ9Q6hmdQqOw',     
    'secret_key': '2SoQO1EkOSs1yRIGrd1hw6VUp6Ejhta6',
    'region': 'us-east-1'
}

# 全局S3客户端
_s3_client = None

def get_s3_client():
    global _s3_client
    if _s3_client is None:
        _s3_client = boto3.client(
            's3',
            endpoint_url=CONFIG['endpoint'],
            aws_access_key_id=CONFIG['access_key'],
            aws_secret_access_key=CONFIG['secret_key'],
            config=Config(signature_version='s3v4'),
            region_name=CONFIG['region']
        )
    return _s3_client

def wait_for_bucket(bucket_name, max_retries=30, delay=1):
    """等待桶完全创建并可用"""
    s3 = get_s3_client()
    
    for i in range(max_retries):
        try:
            s3.head_bucket(Bucket=bucket_name)
            try:
                s3.list_objects_v2(Bucket=bucket_name, MaxKeys=0)
                return True
            except ClientError:
                if i == max_retries - 1:
                    print(f"桶 '{bucket_name}' 存在但操作可能受限")
                time.sleep(delay)
                continue
        except ClientError:
            if i == max_retries - 1:
                print(f"错误: 桶 '{bucket_name}' 在 {max_retries} 次尝试后仍未就绪")
                return False
            time.sleep(delay)
    
    return False

# ================ 存储桶操作 ================

def create_bucket(bucket_name):
    """创建存储桶"""
    s3 = get_s3_client()
    try:
        # 检查桶是否已存在
        try:
            s3.head_bucket(Bucket=bucket_name)
            print(f"桶 '{bucket_name}' 已存在")
            return True
        except ClientError:
            pass
        
        # 创建桶
        s3.create_bucket(Bucket=bucket_name)
        print(f"桶 '{bucket_name}' 创建成功")
        
        # 等待桶就绪
        print(f"等待桶就绪...")
        if wait_for_bucket(bucket_name):
            return True
        else:
            print(f"警告: 桶 '{bucket_name}' 创建后未完全就绪")
            return False
            
    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code')
        if error_code == 'BucketAlreadyOwnedByYou':
            print(f"桶 '{bucket_name}' 已存在")
            return True
        else:
            print(f"创建桶失败: {e}")
            return False

def delete_bucket(bucket_name):
    """删除存储桶(会自动删除桶内所有文件)"""
    s3 = get_s3_client()
    
    try:
        # 检查桶是否存在
        try:
            s3.head_bucket(Bucket=bucket_name)
        except ClientError:
            print(f"桶 '{bucket_name}' 不存在")
            return False
        
        # 删除桶内所有文件
        try:
            # 列出所有对象
            objects_to_delete = []
            paginator = s3.get_paginator('list_objects_v2')
            
            for page in paginator.paginate(Bucket=bucket_name):
                if 'Contents' in page:
                    for obj in page['Contents']:
                        objects_to_delete.append({'Key': obj['Key']})
            
            # 批量删除对象
            if objects_to_delete:
                for i in range(0, len(objects_to_delete), 1000):
                    batch = objects_to_delete[i:i+1000]
                    s3.delete_objects(
                        Bucket=bucket_name,
                        Delete={'Objects': batch}
                    )
                print(f"已删除桶 '{bucket_name}' 中的 {len(objects_to_delete)} 个文件")
        
        except ClientError as e:
            pass
        
        # 删除桶
        s3.delete_bucket(Bucket=bucket_name)
        print(f"桶 '{bucket_name}' 删除成功")
        return True
        
    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code')
        if error_code == 'NoSuchBucket':
            print(f"桶 '{bucket_name}' 不存在")
        else:
            print(f"删除桶失败: {e}")
        return False

def list_buckets():
    """列出所有存储桶"""
    s3 = get_s3_client()
    try:
        response = s3.list_buckets()
        buckets = response.get('Buckets', [])
        
        if buckets:
            print(f"共有 {len(buckets)} 个桶:")
            for bucket in buckets:
                print(f"  - {bucket['Name']}")
            return buckets
        else:
            print("没有存储桶")
            return []
    except ClientError as e:
        print(f"列出存储桶失败: {e}")
        return None

# ================ 文件操作 ================

def create_file(bucket_name, filename, content):
    """在指定桶中创建文件"""
    s3 = get_s3_client()
    try:
        s3.put_object(
            Bucket=bucket_name,
            Key=filename,
            Body=content.encode('utf-8'),
            ContentType='text/plain'
        )
        print(f"在桶 '{bucket_name}' 中创建文件成功: {filename}")
        return True
    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code')
        if error_code == 'NoSuchBucket':
            print(f"桶 '{bucket_name}' 不存在")
        else:
            print(f"创建文件失败: {e}")
        return False

def read_file(bucket_name, filename):
    """从指定桶中读取文件内容"""
    s3 = get_s3_client()
    try:
        response = s3.get_object(Bucket=bucket_name, Key=filename)
        content = response['Body'].read().decode('utf-8')
        print(f"从桶 '{bucket_name}' 中读取文件 '{filename}' 内容:")
        print(content)
        return content
    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code')
        if error_code == 'NoSuchBucket':
            print(f"桶 '{bucket_name}' 不存在")
        elif error_code == 'NoSuchKey':
            print(f"文件 '{filename}' 在桶 '{bucket_name}' 中不存在")
        else:
            print(f"读取文件失败: {e}")
        return None

def update_file(bucket_name, filename, new_content):
    """更新指定桶中的文件内容"""
    s3 = get_s3_client()
    try:
        s3.put_object(
            Bucket=bucket_name,
            Key=filename,
            Body=new_content.encode('utf-8'),
            ContentType='text/plain'
        )
        print(f"在桶 '{bucket_name}' 中更新文件成功: {filename}")
        return True
    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code')
        if error_code == 'NoSuchBucket':
            print(f"桶 '{bucket_name}' 不存在")
        else:
            print(f"更新文件失败: {e}")
        return False

def delete_file(bucket_name, filename):
    """删除指定桶中的文件"""
    s3 = get_s3_client()
    try:
        s3.delete_object(Bucket=bucket_name, Key=filename)
        print(f"从桶 '{bucket_name}' 中删除文件成功: {filename}")
        return True
    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code')
        if error_code == 'NoSuchBucket':
            print(f"桶 '{bucket_name}' 不存在")
        elif error_code == 'NoSuchKey':
            print(f"文件 '{filename}' 在桶 '{bucket_name}' 中不存在")
        else:
            print(f"删除文件失败: {e}")
        return False

def list_files(bucket_name):
    """列出指定桶中的所有文件"""
    s3 = get_s3_client()
    try:
        response = s3.list_objects_v2(Bucket=bucket_name)
        
        if 'Contents' in response:
            files = response['Contents']
            print(f"桶 '{bucket_name}' 中共有 {len(files)} 个文件:")
            for obj in files:
                print(f"  - {obj['Key']} ({obj['Size']} 字节)")
            return files
        else:
            print(f"桶 '{bucket_name}' 为空")
            return []
    except ClientError as e:
        error_code = e.response.get('Error', {}).get('Code')
        if error_code == 'NoSuchBucket':
            print(f"桶 '{bucket_name}' 不存在")
        else:
            print(f"列出文件失败: {e}")
        return []

# ================ 测试函数 ================

def test_basic_operations():
    """测试所有基本操作"""

    # 1. 列出所有桶
    print("\n1. 当前所有存储桶:")
    list_buckets()
    
    # 2. 创建测试桶
    print("\n2. 创建测试存储桶...")
    test_bucket = "test-bucket-2"
    
    if not create_bucket(test_bucket):
        print("创建测试桶失败,终止测试")
        return
    
    # 3. 创建文件
    print("\n3. 在测试桶中创建文件...")
    create_file(test_bucket, "file1.txt", "这是第一个文件的内容")
    create_file(test_bucket, "file2.txt", "这是第二个文件的内容")
    
    # 4. 列出文件
    print("\n4. 测试桶中的文件列表:")
    list_files(test_bucket)
    
    # 5. 读取文件
    print("\n5. 读取文件内容...")
    read_file(test_bucket, "file1.txt")
    
    # 6. 更新文件
    print("\n6. 更新文件内容...")
    update_file(test_bucket, "file1.txt", "这是更新后的文件内容")
    print("\n更新后读取文件内容:")
    read_file(test_bucket, "file1.txt")
    
    # 7. 删除文件
    print("\n7. 删除文件...")
    delete_file(test_bucket, "file1.txt")
    delete_file(test_bucket, "file2.txt")
    print("\n删除后文件列表:")
    list_files(test_bucket)
    
    # 8. 删除桶
    print("\n8. 删除测试桶...")
    delete_bucket(test_bucket)
    
    # 9. 最终桶列表
    print("\n9. 最终存储桶列表:")
    list_buckets()
    
    print("所有基本操作测试完成!")

# ================ 使用示例 ================
if __name__ == "__main__":
    # 运行完整测试
    test_basic_operations()
    
    # 也可以单独使用:
    # 1. 创建桶
    # create_bucket("my-bucket")
    
    # 2. 创建文件
    # create_file("my-bucket", "hello.txt", "Hello World!")
    
    # 3. 读取文件
    # read_file("my-bucket", "hello.txt")
    
    # 4. 列出文件
    # list_files("my-bucket")
    
    # 5. 更新文件
    # update_file("my-bucket", "hello.txt", "Updated content")
    
    # 6. 删除文件
    # delete_file("my-bucket", "hello.txt")
    
    # 7. 删除桶
    # delete_bucket("my-bucket")