import boto3
import time
from botocore.client import Config
from botocore.exceptions import ClientError
# 用户的access_key和secret_key可以通过命令行工具udbto-cli或者通过api访问master节点获取
# ./udbto-cli user info testuser
# curl -v "http://192.168.2.190:15010/user/info?user=testuser"
CONFIG = {
'endpoint': 'http://192.168.2.190:15410',
'access_key': 'j1IbHJ9Q6hmdQqOw',
'secret_key': '2SoQO1EkOSs1yRIGrd1hw6VUp6Ejhta6',
'region': 'us-east-1'
}
# 全局S3客户端
_s3_client = None
def get_s3_client():
global _s3_client
if _s3_client is None:
_s3_client = boto3.client(
's3',
endpoint_url=CONFIG['endpoint'],
aws_access_key_id=CONFIG['access_key'],
aws_secret_access_key=CONFIG['secret_key'],
config=Config(signature_version='s3v4'),
region_name=CONFIG['region']
)
return _s3_client
def wait_for_bucket(bucket_name, max_retries=30, delay=1):
"""等待桶完全创建并可用"""
s3 = get_s3_client()
for i in range(max_retries):
try:
s3.head_bucket(Bucket=bucket_name)
try:
s3.list_objects_v2(Bucket=bucket_name, MaxKeys=0)
return True
except ClientError:
if i == max_retries - 1:
print(f"桶 '{bucket_name}' 存在但操作可能受限")
time.sleep(delay)
continue
except ClientError:
if i == max_retries - 1:
print(f"错误: 桶 '{bucket_name}' 在 {max_retries} 次尝试后仍未就绪")
return False
time.sleep(delay)
return False
# ================ 存储桶操作 ================
def create_bucket(bucket_name):
"""创建存储桶"""
s3 = get_s3_client()
try:
# 检查桶是否已存在
try:
s3.head_bucket(Bucket=bucket_name)
print(f"桶 '{bucket_name}' 已存在")
return True
except ClientError:
pass
# 创建桶
s3.create_bucket(Bucket=bucket_name)
print(f"桶 '{bucket_name}' 创建成功")
# 等待桶就绪
print(f"等待桶就绪...")
if wait_for_bucket(bucket_name):
return True
else:
print(f"警告: 桶 '{bucket_name}' 创建后未完全就绪")
return False
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code')
if error_code == 'BucketAlreadyOwnedByYou':
print(f"桶 '{bucket_name}' 已存在")
return True
else:
print(f"创建桶失败: {e}")
return False
def delete_bucket(bucket_name):
"""删除存储桶(会自动删除桶内所有文件)"""
s3 = get_s3_client()
try:
# 检查桶是否存在
try:
s3.head_bucket(Bucket=bucket_name)
except ClientError:
print(f"桶 '{bucket_name}' 不存在")
return False
# 删除桶内所有文件
try:
# 列出所有对象
objects_to_delete = []
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket_name):
if 'Contents' in page:
for obj in page['Contents']:
objects_to_delete.append({'Key': obj['Key']})
# 批量删除对象
if objects_to_delete:
for i in range(0, len(objects_to_delete), 1000):
batch = objects_to_delete[i:i+1000]
s3.delete_objects(
Bucket=bucket_name,
Delete={'Objects': batch}
)
print(f"已删除桶 '{bucket_name}' 中的 {len(objects_to_delete)} 个文件")
except ClientError as e:
pass
# 删除桶
s3.delete_bucket(Bucket=bucket_name)
print(f"桶 '{bucket_name}' 删除成功")
return True
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code')
if error_code == 'NoSuchBucket':
print(f"桶 '{bucket_name}' 不存在")
else:
print(f"删除桶失败: {e}")
return False
def list_buckets():
"""列出所有存储桶"""
s3 = get_s3_client()
try:
response = s3.list_buckets()
buckets = response.get('Buckets', [])
if buckets:
print(f"共有 {len(buckets)} 个桶:")
for bucket in buckets:
print(f" - {bucket['Name']}")
return buckets
else:
print("没有存储桶")
return []
except ClientError as e:
print(f"列出存储桶失败: {e}")
return None
# ================ 文件操作 ================
def create_file(bucket_name, filename, content):
"""在指定桶中创建文件"""
s3 = get_s3_client()
try:
s3.put_object(
Bucket=bucket_name,
Key=filename,
Body=content.encode('utf-8'),
ContentType='text/plain'
)
print(f"在桶 '{bucket_name}' 中创建文件成功: {filename}")
return True
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code')
if error_code == 'NoSuchBucket':
print(f"桶 '{bucket_name}' 不存在")
else:
print(f"创建文件失败: {e}")
return False
def read_file(bucket_name, filename):
"""从指定桶中读取文件内容"""
s3 = get_s3_client()
try:
response = s3.get_object(Bucket=bucket_name, Key=filename)
content = response['Body'].read().decode('utf-8')
print(f"从桶 '{bucket_name}' 中读取文件 '{filename}' 内容:")
print(content)
return content
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code')
if error_code == 'NoSuchBucket':
print(f"桶 '{bucket_name}' 不存在")
elif error_code == 'NoSuchKey':
print(f"文件 '{filename}' 在桶 '{bucket_name}' 中不存在")
else:
print(f"读取文件失败: {e}")
return None
def update_file(bucket_name, filename, new_content):
"""更新指定桶中的文件内容"""
s3 = get_s3_client()
try:
s3.put_object(
Bucket=bucket_name,
Key=filename,
Body=new_content.encode('utf-8'),
ContentType='text/plain'
)
print(f"在桶 '{bucket_name}' 中更新文件成功: {filename}")
return True
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code')
if error_code == 'NoSuchBucket':
print(f"桶 '{bucket_name}' 不存在")
else:
print(f"更新文件失败: {e}")
return False
def delete_file(bucket_name, filename):
"""删除指定桶中的文件"""
s3 = get_s3_client()
try:
s3.delete_object(Bucket=bucket_name, Key=filename)
print(f"从桶 '{bucket_name}' 中删除文件成功: {filename}")
return True
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code')
if error_code == 'NoSuchBucket':
print(f"桶 '{bucket_name}' 不存在")
elif error_code == 'NoSuchKey':
print(f"文件 '{filename}' 在桶 '{bucket_name}' 中不存在")
else:
print(f"删除文件失败: {e}")
return False
def list_files(bucket_name):
"""列出指定桶中的所有文件"""
s3 = get_s3_client()
try:
response = s3.list_objects_v2(Bucket=bucket_name)
if 'Contents' in response:
files = response['Contents']
print(f"桶 '{bucket_name}' 中共有 {len(files)} 个文件:")
for obj in files:
print(f" - {obj['Key']} ({obj['Size']} 字节)")
return files
else:
print(f"桶 '{bucket_name}' 为空")
return []
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code')
if error_code == 'NoSuchBucket':
print(f"桶 '{bucket_name}' 不存在")
else:
print(f"列出文件失败: {e}")
return []
# ================ 测试函数 ================
def test_basic_operations():
"""测试所有基本操作"""
# 1. 列出所有桶
print("\n1. 当前所有存储桶:")
list_buckets()
# 2. 创建测试桶
print("\n2. 创建测试存储桶...")
test_bucket = "test-bucket-2"
if not create_bucket(test_bucket):
print("创建测试桶失败,终止测试")
return
# 3. 创建文件
print("\n3. 在测试桶中创建文件...")
create_file(test_bucket, "file1.txt", "这是第一个文件的内容")
create_file(test_bucket, "file2.txt", "这是第二个文件的内容")
# 4. 列出文件
print("\n4. 测试桶中的文件列表:")
list_files(test_bucket)
# 5. 读取文件
print("\n5. 读取文件内容...")
read_file(test_bucket, "file1.txt")
# 6. 更新文件
print("\n6. 更新文件内容...")
update_file(test_bucket, "file1.txt", "这是更新后的文件内容")
print("\n更新后读取文件内容:")
read_file(test_bucket, "file1.txt")
# 7. 删除文件
print("\n7. 删除文件...")
delete_file(test_bucket, "file1.txt")
delete_file(test_bucket, "file2.txt")
print("\n删除后文件列表:")
list_files(test_bucket)
# 8. 删除桶
print("\n8. 删除测试桶...")
delete_bucket(test_bucket)
# 9. 最终桶列表
print("\n9. 最终存储桶列表:")
list_buckets()
print("所有基本操作测试完成!")
# ================ 使用示例 ================
if __name__ == "__main__":
# 运行完整测试
test_basic_operations()
# 也可以单独使用:
# 1. 创建桶
# create_bucket("my-bucket")
# 2. 创建文件
# create_file("my-bucket", "hello.txt", "Hello World!")
# 3. 读取文件
# read_file("my-bucket", "hello.txt")
# 4. 列出文件
# list_files("my-bucket")
# 5. 更新文件
# update_file("my-bucket", "hello.txt", "Updated content")
# 6. 删除文件
# delete_file("my-bucket", "hello.txt")
# 7. 删除桶
# delete_bucket("my-bucket")