内容目录
前往 Azure 创建应用
- 打开 Azure 登录,并选择
管理 Microsoft Entra ID
- 点击
添加
->应用注册
- 重定向URI选择
公共客户端/本机(移动和桌面)
输入http://127.0.0.1:8400
- 点击
管理
->证书和密码
->新客户端密码
创建后记得保存好密钥 - 点击
API权限
选择Files
的几个相关权限
class onedrive:
def __init__(self, client_id, client_secret):
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = 'http://127.0.0.1:8400'
self.oauth2_uri = 'https://login.microsoftonline.com/common/oauth2/token'
self.resource_uri = 'https://graph.microsoft.com'
self.onedrive_uri = self.resource_uri + '/v1.0/me/drive'
self.scope = 'offline_access onedrive.readwrite'
self.header = {
'Content-Type': 'application/x-www-form-urlencoded'
}
self.token = self.read_token()['access_token']
self.header['Authorization'] = f'bearer {self.token}'
def get_code(self):
url = f'https://login.microsoftonline.com/common/oauth2/authorize?' \
f'response_type=code&client_id={self.client_id}&redirect_uri={self.redirect_uri}'
webbrowser.open(url, new=0, autoraise=True)
def get_token(self, url):
code = parse_qs(urlparse(url).query).get('code')[0]
data = {
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'client_secret': self.client_secret,
'code': code,
'grant_type': 'authorization_code',
'resource': self.resource_uri
}
resp = requests.post(self.oauth2_uri, headers=self.header, data=data).json()
return self.save_token(resp)
def refresh_token(self):
token = self.read_token(only_read=True)
data = {
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'client_secret': self.client_secret,
'refresh_token': token['refresh_token'],
'grant_type': 'refresh_token',
'resource': 'https://graph.microsoft.com'
}
resp = requests.post(self.oauth2_uri, headers=self.header, data=data).json()
return self.save_token(resp)
@staticmethod
def save_token(resp):
if 'error' in resp:
return False
token = {
'access_token': resp['access_token'],
'refresh_token': resp['refresh_token'],
'expires_on': int(resp['expires_on'])
}
with open('token.json', 'w') as f:
f.write(json.dumps(token))
return token
def read_token(self, only_read=False):
if os.path.exists('token.json'):
with open('token.json', 'r') as f:
token = json.loads(f.read())
else:
self.get_code()
token = self.get_token(input('请输入Url:'))
if only_read:
return token
if token['expires_on'] <= int(time.time()):
token = self.refresh_token()
return token
def get_path(self, path, op):
if path[0] == '/': path = path[1:]
if path[-1] == '/': path = path[:-1]
if op[0] == '/': op = op[1:]
return self.onedrive_uri + '/root:/{}:/{}'.format(path, op)
def create_folder(self, path):
path = list(filter(None, path.split('/')))
pa = '/'.join(path[:len(path) - 1])
name = path[len(path) - 1]
data = json.dumps({
"name": name,
"folder": {},
})
r = requests.post(self.get_path(pa, 'children'), headers=self.header, data=data)
return r.status_code
def upload_url(self, path, conflict="fail"):
self.token = self.read_token()['access_token']
r = requests.post(self.get_path(
path, 'createUploadSession'
), headers=self.header)
print(r.text)
if r.status_code == 200:
return r.json()['uploadUrl']
else:
return ""
def upload_file(self, path, data):
size = len(data)
try:
if size > 4000000:
return self.upload_big_file(path, data)
else:
r = requests.put(self.get_path(path, 'content'), headers=self.header, data=data)
if 'error' in r:
return "上传失败"
return "上传成功"
except:
return "上传失败"
def upload_big_file(self, path, data):
url = self.upload_url(path)
if url == "":
return "上传取消"
size = len(data)
chunk_size = 3276800
file_name = path.split('/')[len(path.split('/')) - 1]
pbar = tqdm(total=size, leave=False, unit='B', unit_scale=True, desc=file_name)
for i in range(0, size, chunk_size):
chunk_data = data[i:i + chunk_size]
pbar.update(len(chunk_data))
r = requests.put(url, headers={
'Content-Length': str(len(chunk_data)),
'Content-Range': 'bytes {}-{}/{}'.format(i, i + len(chunk_data) - 1, size)
}, data=chunk_data)
if r.status_code not in [200, 201, 202]:
print("上传出错")
break
def check_file_exists(self, file_name):
# 构造搜索 API 的 URL
base_url = "https://graph.microsoft.com/v1.0/me/drive/root"
search_url = f"{base_url}:/{file_name}:/"
try:
response = requests.get(search_url, headers=self.header)
except:
return True
# print(response.text)
if response.status_code == 200:
# 文件存在
return True
elif response.status_code == 404:
# 文件不存在
return False
else:
# 其他错误处理
return True
def zip_dir(directory, zip_filename):
# 创建ZIP文件
with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
# 遍历目录及其所有子目录
for root, dirs, files in os.walk(directory):
for file in files:
# 获取文件的绝对路径
file_path = os.path.join(root, file)
# 获取文件在ZIP文件中的相对路径
arcname = os.path.relpath(file_path, os.path.join(directory, os.pardir))
# 将文件添加到ZIP文件中
zipf.write(file_path, arcname)
return os.path.join(directory, zip_filename)
client_id = '应用程序(客户端) ID'
client_secret = '客户端密码'
tenant_id = '目录(租户) ID'
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END
暂无评论内容