1、创建存储空间
https://help.aliyun.com/document_detail/31885.html?spm=5176.208357.1107607.16.2d62390fsctomh
2、上传文件
# -*- coding: utf-8 -*-
import oss2
# 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。
auth = oss2.Auth('<yourAccessKeyId>', '<yourAccessKeySecret>')
# Endpoint以杭州为例,其它Region请按实际情况填写。
bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', '<yourBucketName>')
# 必须以二进制的方式打开文件,因为需要知道文件包含的字节数。
with open('<yourLocalFile>', 'rb') as fileobj:
# Seek方法用于指定从第1000个字节位置开始读写。上传时会从您指定的第1000个字节位置开始上传,直到文件结束。
fileobj.seek(1000, os.SEEK_SET)
# Tell方法用于返回当前位置。
current = fileobj.tell()
bucket.put_object('<yourObjectName>', fileobj)
Python SDK还提供了一个更加便捷的方法用于上传本地文件:
bucket.put_object_from_file('<yourObjectName>', '<yourLocalFile>')
3、下载文件
https://help.aliyun.com/document_detail/31887.html?spm=5176.208357.1107607.18.2d62390fsctomh
下载到本地
# -*- coding: utf-8 -*-
import oss2
# 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录RAM控制台创建RAM账号。
auth = oss2.Auth('<yourAccessKeyId>', '<yourAccessKeySecret>')
# Endpoint以杭州为例,其它Region请按实际情况填写。
bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', '<yourBucketName>')
# 下载OSS文件到本地文件。如果指定的本地文件存在会覆盖,不存在则新建。
# <yourLocalFile>由本地文件路径加文件名包括后缀组成,例如/users/local/myfile.txt。
# <yourObjectName>表示下载的OSS文件的完整名称,即包含文件后缀在内的完整路径,例如abc/efg/123.jpg。
bucket.get_object_to_file('<yourObjectName>', '<yourLocalFile>')
4、获取oss上的url
# #获取oss上的url
file_url = bucket.sign_url('GET', oss_file_path, 360) #360是链接360秒有效
5、完整实例
# -*- coding: utf-8 -*-
import os
from datetime import datetime
import oss2
# 以下代码展示了一些和文件相关的高级用法,如中文、设置用户自定义元数据、拷贝文件、追加上传等。
# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等信息。
# 通过环境变量获取,或者把诸如“<你的AccessKeyId>”替换成真实的AccessKeyId等。
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<你的AccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<你的AccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<你的Bucket>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<你的访问域名>')
bucket_folder = '<bucket存储文件夹>' #bucket_folder = 'test/'
# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<' not in param, '请设置参数:' + param
# 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行
bucket = oss2.Bucket(
oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)
# 上传本地文件至oss,并获取oss上文件url,url设置360s有效
# 本地path格式如:tests/keywords/优惠中心1.0/券批次领取记录.xls
def upload_oss_getUrl(path, name=None):
# file_path = path
# file_name = name
# oss_file_path = "test/"+file_name
# local_file_path = "./"+file_path+"/"+file_name
# # 文件上传至oss
# bucket.put_object_from_file(
# oss_file_path, local_file_path)
# # 获取oss上的url
# file_url = bucket.sign_url('GET', oss_file_path, 360) # 360是链接360秒有效
# print(file_url)
# return file_url
file_path = path
name = file_path.split('/').pop()
oss_file_path = bucket_folder + name
# 文件上传至oss
bucket.put_object_from_file(
oss_file_path, file_path)
# 获取oss上的url
file_url = bucket.sign_url('GET', oss_file_path, 360) # 360是链接360秒有效
print(file_url)
return file_url
if __name__ == '__main__':
upload_oss_getUrl = upload_oss_getUrl("./tests/keywords/XXX/测试清单.txt")