Simple and easy-to-use MinIO object storage service library with centralized gateway support
Project description
RefStore
简单易用的 MinIO 对象存储服务封装库,支持集中式文件网关。
特性
- 同步/异步 API - 提供完整的同步和异步接口
- Web API (FastAPI) - 基于 FastAPI 的 RESTful 接口
- 集中式文件网关 - 多项目共享网关,通过 GatewayClient SDK 连接
- S3 URI 编码/解码 - 统一的文件标识:
s3://bucket/path/to/file - 可选逻辑桶名映射 - 默认关闭,遵循 AWS S3 最佳实践(使用 Prefix 隔离)
- 配置验证 - 内置配置格式验证和连接测试
- 重试机制 - 带指数退避的自动重试
- 完整的文档和示例 - 包含丰富的使用示例和测试用例
安装
基础安装
pip install refstore
完整安装(包含 Web API)
pip install refstore[web]
网关客户端安装(包含异步网关 SDK)
pip install refstore[gateway]
开发安装
pip install refstore[dev]
全部安装
pip install refstore[all]
快速开始
基础用法(桶映射关闭,推荐)
默认模式下,逻辑桶映射关闭,桶名直接对应 MinIO 物理桶,使用 Prefix 进行项目/租户隔离。
from refstore import RefStore
config = {
"minio": {
"endpoint": "localhost:9000",
"access_key": "your_access_key",
"secret_key": "your_secret_key",
"secure": False,
},
"default_bucket": "my-project",
}
store = RefStore(config)
store.init_buckets()
# 上传文件,用 path 进行项目/租户隔离
uri = store.upload_file(
file_data=b"Hello, RefStore!",
original_filename="test.txt",
content_type="text/plain",
path="tenant-a/documents"
)
print(f"文件已上传: {uri}") # s3://my-project/tenant-a/documents/test.txt
# 生成预签名 URL
url = store.get_presigned_url(uri, expiry_seconds=3600)
print(f"下载 URL: {url}")
# 下载文件
data = store.download_file(uri)
print(f"文件内容: {data.decode('utf-8')}")
# 获取文件信息
info = store.get_file_info(uri)
print(f"文件大小: {info['size_human']}")
# 列出文件
files = store.list_files(prefix="tenant-a/")
print(f"找到 {len(files)} 个文件")
开启逻辑桶名映射(高级用法)
如果你需要将多个逻辑桶映射到不同的物理桶,可以开启桶映射功能:
from refstore import RefStore
config = {
"minio": {
"endpoint": "localhost:9000",
"access_key": "your_access_key",
"secret_key": "your_secret_key",
"secure": False,
},
"enable_bucket_mapping": True,
"bucket_map": {
"user": "physical-user-bucket",
"public": "physical-public-bucket",
},
"default_bucket": "user",
"presigned_expiry": 3600,
}
store = RefStore(config)
store.init_buckets()
# 上传文件 - 默认返回物理桶 URI
uri = store.upload_file(
file_data=b"Hello!",
original_filename="test.txt",
logic_bucket="user",
)
print(f"物理桶 URI: {uri}") # s3://physical-user-bucket/2026/03/06/xxx.txt
# 上传文件 - 返回逻辑桶 URI
uri_logical = store.upload_file(
file_data=b"Hello!",
original_filename="test.txt",
logic_bucket="user",
use_logical_uri=True,
)
print(f"逻辑桶 URI: {uri_logical}") # s3://user/2026/03/06/xxx.txt
# 下载时两种 URI 都支持
data = store.download_file(uri) # 物理桶 URI 可以用
data = store.download_file(uri_logical) # 逻辑桶 URI 也可以用
异步 API
import asyncio
from refstore import AsyncRefStore
config = {
"minio": {
"endpoint": "localhost:9000",
"access_key": "your_access_key",
"secret_key": "your_secret_key",
"secure": False,
},
}
async def main():
async with AsyncRefStore(config) as store:
uri = await store.upload_file(
file_data=b"Hello, Async RefStore!",
original_filename="async_test.txt",
)
data = await store.download_file(uri)
print(data.decode('utf-8'))
# 并发上传多个文件
tasks = [
store.upload_file(b"File 1", "file1.txt")
for _ in range(10)
]
uris = await asyncio.gather(*tasks)
asyncio.run(main())
Web API
启动 Web 服务:
from refstore import web_app, init_web_service
config = {
"minio": {
"endpoint": "localhost:9000",
"access_key": "your_access_key",
"secret_key": "your_secret_key",
"secure": False,
},
}
init_web_service(config)
if __name__ == "__main__":
import uvicorn
uvicorn.run(web_app, host="0.0.0.0", port=8000)
或使用命令行:
uvicorn refstore.web:web_app --host 0.0.0.0 --port 8000 --reload
访问 API 文档:http://localhost:8000/docs
集中式文件网关(GatewayClient)
多项目共享同一个 RefStore 网关服务,各项目无需配置 MinIO 连接信息,只需知道网关地址。
启动网关服务(运维侧):
from refstore import web_app, init_web_service
import uvicorn
config = {
"minio": {
"endpoint": "10.31.31.41:9000",
"access_key": "admin_key",
"secret_key": "admin_secret",
"secure": False,
},
"default_bucket": "shared-storage",
}
init_web_service(config)
uvicorn.run(web_app, host="0.0.0.0", port=8000)
通过 SDK 连接网关(各项目侧):
from refstore.gateway import GatewayClient
# 只需网关地址,无需 MinIO 连接信息
client = GatewayClient(gateway_url="http://gateway-host:8000")
# 查看网关状态
status = client.get_status()
print(f"连接状态: {status['status']}, 桶数量: {status['bucket_count']}")
# 查看配置(脱敏)
config = client.get_config()
# 桶管理
client.create_bucket("my-project-bucket")
buckets = client.list_buckets()
info = client.get_bucket_info("my-project-bucket")
client.delete_bucket("old-bucket", force=True)
# 文件操作
uri_result = client.upload_file(b"Hello Gateway!", filename="test.txt", logic_bucket="default")
uri = uri_result["uri"]
data = client.download_file(uri)
url_result = client.get_presigned_url(uri)
files = client.list_files()
client.delete_file(uri)
异步网关客户端:
import asyncio
from refstore.gateway import AsyncGatewayClient
async def main():
async with AsyncGatewayClient(gateway_url="http://gateway-host:8000") as client:
status = await client.get_status()
result = await client.upload_file(b"Hello!", filename="test.txt")
data = await client.download_file(result["uri"])
asyncio.run(main())
配置
完整配置示例
config = {
# MinIO 连接配置(必需)
"minio": {
"endpoint": "localhost:9000",
"access_key": "your_access_key",
"secret_key": "your_secret_key",
"secure": False,
},
# 逻辑桶映射开关(可选,默认 False)
# 遵循 AWS S3 最佳实践:默认关闭,用 Prefix 隔离
"enable_bucket_mapping": False,
# 逻辑桶名到物理桶名的映射(仅在 enable_bucket_mapping=True 时生效)
"bucket_map": {
"user": "physical-user-bucket",
"public": "physical-public-bucket",
"temp": "physical-temp-bucket",
},
# 默认桶名(可选,默认为 "default")
"default_bucket": "default",
# 预签名 URL 过期时间(可选,默认为 3600 秒)
"presigned_expiry": 3600,
# 公共基础 URL(可选,用于生成预签名URL时替换host部分)
"public_url": "https://cdn.example.com",
}
配置验证
from refstore import ConfigValidator
try:
config = ConfigValidator.normalize_config(your_config)
print("配置有效")
except ConfigError as e:
print(f"配置无效: {e}")
try:
ConfigValidator.test_connection(config)
print("连接成功")
except ConnectionError as e:
print(f"连接失败: {e}")
使用公共URL(反向代理场景)
当你通过nginx等反向代理访问MinIO时,可以使用 public_url 配置来生成使用公共域名的预签名URL:
config = {
"minio": {
"endpoint": "10.31.31.41:9000",
"access_key": "your_access_key",
"secret_key": "your_secret_key",
"secure": False,
},
"public_url": "https://shclzczy.odb.sh.cn/cdip-file-system/",
}
store = RefStore(config)
uri = store.upload_file(b"Hello", "test.txt")
url = store.get_presigned_url(uri)
# url: https://shclzczy.odb.sh.cn/cdip-file-system/default/.../test.txt?X-Amz-Algorithm=...
重要说明:
public_url只影响预签名URL的显示,MinIO客户端的实际连接仍然使用endpoint配置- 预签名URL的签名是基于内部
endpoint计算的 - 如果
public_url包含路径前缀(如/cdip-file-system/),会自动添加到生成的URL中 - 必须正确配置 nginx 的 Host header,否则签名验证会失败
Nginx 配置示例(关键!)
server {
listen 443 ssl;
server_name shclzczy.odb.sh.cn;
ssl_certificate /path/to/cert.pem;
ssl_certificate_key /path/to/key.pem;
location /cdip-file-system/ {
proxy_pass http://10.31.31.41:9000/;
# 关键:Host header 必须设置为 MinIO 的内部地址
proxy_set_header Host 10.31.31.41:9000;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_connect_timeout 300;
proxy_http_version 1.1;
proxy_set_header Connection "";
chunked_transfer_encoding off;
}
}
URI 操作
RefStore 使用标准化的 S3 URI 格式:s3://bucket/path/to/file
from refstore import encode_uri, decode_uri, validate_uri, get_bucket_from_uri
# 编码 URI
uri = encode_uri("my-bucket", "documents/report.pdf")
# s3://my-bucket/documents/report.pdf
# 解码 URI
bucket, object_name = decode_uri(uri)
# bucket: my-bucket, object_name: documents/report.pdf
# 验证 URI
is_valid = validate_uri(uri)
# True
# 提取桶名
bucket = get_bucket_from_uri(uri)
# my-bucket
# 提取对象名称
object_name = get_object_name_from_uri(uri)
# documents/report.pdf
# 获取文件扩展名
ext = get_file_extension(uri)
# .pdf
高级功能
重试机制
from refstore import retry_with_backoff
@retry_with_backoff(max_retries=3, base_delay=1.0)
def upload_with_retry(store, data, filename):
return store.upload_file(data, filename)
桶管理
from refstore import BucketManager
bucket_manager = BucketManager(minio_client)
bucket_manager.create_bucket("my-bucket")
exists = bucket_manager.bucket_exists("my-bucket")
buckets = bucket_manager.list_buckets()
info = bucket_manager.get_bucket_info("my-bucket")
bucket_manager.delete_bucket("my-bucket", force=True)
批量操作
uris = [
"s3://default/file1.txt",
"s3://default/file2.txt",
"s3://default/file3.txt",
]
result = store.delete_files(uris)
print(f"删除成功: {len(result['deleted'])}")
print(f"删除失败: {len(result['failed'])}")
网关桶映射热更新
通过网关 API 或 SDK 可以在运行时更新桶映射配置:
from refstore.gateway import GatewayClient
client = GatewayClient(gateway_url="http://gateway-host:8000")
# 查看当前映射
mapping = client.get_bucket_mapping()
print(f"映射开启: {mapping['enable_bucket_mapping']}")
print(f"映射表: {mapping['bucket_map']}")
# 热更新映射
client.update_bucket_mapping(
enable_bucket_mapping=True,
bucket_map={
"user": "prod-user-bucket",
"logs": "prod-logs-bucket",
}
)
API 文档
同步 API
RefStore- 同步文件服务类upload_file(file_data, original_filename, content_type, logic_bucket, path, use_logical_uri)- 上传文件upload_from_local(local_path, logic_bucket, path, content_type, use_logical_uri)- 从本地路径上传upload_from_url(url, logic_bucket, path, timeout, use_logical_uri)- 从 URL 上传download_file(s3_uri)- 下载文件到内存download_to_local(s3_uri, local_path)- 下载文件到本地get_presigned_url(s3_uri, expiry_seconds, method)- 生成预签名 URLget_file_info(s3_uri)- 获取文件信息file_exists(s3_uri)- 检查文件是否存在delete_file(s3_uri)- 删除文件delete_files(s3_uris)- 批量删除文件list_files(logic_bucket, prefix, recursive, use_logical_uri)- 列出文件get_physical_bucket(logic_bucket)- 逻辑桶名 -> 物理桶名get_logical_bucket(physical_bucket)- 物理桶名 -> 逻辑桶名
异步 API
AsyncRefStore- 异步文件服务类(方法签名与同步 API 相同)
Web API 端点
文件操作:
POST /upload- 上传文件GET /download- 下载文件GET /presigned-url- 生成预签名 URLGET /info- 获取文件信息DELETE /delete- 删除文件GET /list- 列出文件GET /health- 健康检查
网关管理:
GET /gateway/status- MinIO 连接状态和服务信息GET /gateway/config- 查看当前配置(脱敏)GET /gateway/buckets- 列出所有桶POST /gateway/buckets- 创建桶GET /gateway/buckets/{name}- 获取桶详情DELETE /gateway/buckets/{name}- 删除桶GET /gateway/bucket-mapping- 查看桶映射配置PUT /gateway/bucket-mapping- 热更新桶映射配置
Gateway SDK
GatewayClient- 同步网关客户端- 管理:
get_status(),get_config(),list_buckets(),create_bucket(),get_bucket_info(),delete_bucket(),get_bucket_mapping(),update_bucket_mapping() - 文件:
upload_file(),upload_from_local(),download_file(),download_to_local(),get_presigned_url(),get_file_info(),delete_file(),delete_files(),list_files()
- 管理:
AsyncGatewayClient- 异步网关客户端(方法签名同上,均为 async)
示例
查看 examples/ 目录获取更多使用示例:
basic_usage.py- 同步 API 基础用法async_usage.py- 异步 API 用法web_service.py- Web API 服务启动web_client.py- Web API 客户端使用config_validation.py- 配置验证示例
运行示例:
python examples/basic_usage.py
python examples/async_usage.py
python examples/web_service.py
python examples/config_validation.py
开发
运行测试
pip install -e ".[dev]"
pytest
pytest --cov=refstore --cov-report=html
代码格式化
black refstore tests examples
isort refstore tests examples
代码检查
flake8 refstore tests examples
mypy refstore
贡献
欢迎贡献!请查看 CONTRIBUTING.md 了解如何参与贡献。
许可证
本项目采用 MIT 许可证。详见 LICENSE 文件。
问题反馈
如果你遇到问题或有建议,请在 GitHub Issues 中提出。
致谢
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file refstore-0.3.0.tar.gz.
File metadata
- Download URL: refstore-0.3.0.tar.gz
- Upload date:
- Size: 41.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
196115a0bc71e0e3e4154109e58e92f61e7c6c1584a505bb3b5942bec02a948c
|
|
| MD5 |
4eff64d50a69927f9f9413b16112b108
|
|
| BLAKE2b-256 |
699a39b1fd2f074c22268910beaca7b845c06ed495bc69d5bf691652f9027b5d
|
File details
Details for the file refstore-0.3.0-py3-none-any.whl.
File metadata
- Download URL: refstore-0.3.0-py3-none-any.whl
- Upload date:
- Size: 37.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7f8a882323fd284eabdb178ea3ab5e8774127f0c3adf4dc99eb84d2c01490de5
|
|
| MD5 |
e14e59ce42348cf35e16c61e76974c7b
|
|
| BLAKE2b-256 |
6f4a8010b85979c35eaa598a53e9cdbd5baf5d6929e2bd7610c8eb0e4b901da6
|