python调用onedrive API实现文件上传

内容目录

前往 Azure 创建应用

  1. 打开 Azure 登录,并选择 管理 Microsoft Entra ID
  2. 点击 添加 -> 应用注册
  3. 重定向URI选择 公共客户端/本机(移动和桌面) 输入 http://127.0.0.1:8400
  4. 点击 管理 -> 证书和密码 -> 新客户端密码 创建后记得保存好密钥
  5. 点击 API权限 选择 Files 的几个相关权限
class onedrive:

    def __init__(self, client_id, client_secret):
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = 'http://127.0.0.1:8400'
        self.oauth2_uri = 'https://login.microsoftonline.com/common/oauth2/token'
        self.resource_uri = 'https://graph.microsoft.com'
        self.onedrive_uri = self.resource_uri + '/v1.0/me/drive'
        self.scope = 'offline_access onedrive.readwrite'
        self.header = {
            'Content-Type': 'application/x-www-form-urlencoded'
        }
        self.token = self.read_token()['access_token']
        self.header['Authorization'] = f'bearer {self.token}'

    def get_code(self):
        url = f'https://login.microsoftonline.com/common/oauth2/authorize?' \
              f'response_type=code&client_id={self.client_id}&redirect_uri={self.redirect_uri}'
        webbrowser.open(url, new=0, autoraise=True)

    def get_token(self, url):
        code = parse_qs(urlparse(url).query).get('code')[0]
        data = {
            'client_id': self.client_id,
            'redirect_uri': self.redirect_uri,
            'client_secret': self.client_secret,
            'code': code,
            'grant_type': 'authorization_code',
            'resource': self.resource_uri
        }
        resp = requests.post(self.oauth2_uri, headers=self.header, data=data).json()
        return self.save_token(resp)

    def refresh_token(self):
        token = self.read_token(only_read=True)
        data = {
            'client_id': self.client_id,
            'redirect_uri': self.redirect_uri,
            'client_secret': self.client_secret,
            'refresh_token': token['refresh_token'],
            'grant_type': 'refresh_token',
            'resource': 'https://graph.microsoft.com'
        }
        resp = requests.post(self.oauth2_uri, headers=self.header, data=data).json()
        return self.save_token(resp)

    @staticmethod
    def save_token(resp):
        if 'error' in resp:
            return False
        token = {
            'access_token': resp['access_token'],
            'refresh_token': resp['refresh_token'],
            'expires_on': int(resp['expires_on'])
        }
        with open('token.json', 'w') as f:
            f.write(json.dumps(token))
        return token

    def read_token(self, only_read=False):
        if os.path.exists('token.json'):
            with open('token.json', 'r') as f:
                token = json.loads(f.read())
        else:
            self.get_code()
            token = self.get_token(input('请输入Url:'))
        if only_read:
            return token
        if token['expires_on'] <= int(time.time()):
            token = self.refresh_token()
        return token

    def get_path(self, path, op):
        if path[0] == '/': path = path[1:]
        if path[-1] == '/': path = path[:-1]
        if op[0] == '/': op = op[1:]
        return self.onedrive_uri + '/root:/{}:/{}'.format(path, op)

    def create_folder(self, path):
        path = list(filter(None, path.split('/')))
        pa = '/'.join(path[:len(path) - 1])
        name = path[len(path) - 1]
        data = json.dumps({
            "name": name,
            "folder": {},
        })
        r = requests.post(self.get_path(pa, 'children'), headers=self.header, data=data)
        return r.status_code

    def upload_url(self, path, conflict="fail"):
        self.token = self.read_token()['access_token']
        r = requests.post(self.get_path(
            path, 'createUploadSession'
        ), headers=self.header)
        print(r.text)
        if r.status_code == 200:
            return r.json()['uploadUrl']
        else:
            return ""

    def upload_file(self, path, data):
        size = len(data)
        try:
            if size > 4000000:
                return self.upload_big_file(path, data)
            else:
                r = requests.put(self.get_path(path, 'content'), headers=self.header, data=data)
                if 'error' in r:
                    return "上传失败"
                return "上传成功"
        except:
            return "上传失败"

    def upload_big_file(self, path, data):
        url = self.upload_url(path)
        if url == "":
            return "上传取消"
        size = len(data)
        chunk_size = 3276800
        file_name = path.split('/')[len(path.split('/')) - 1]
        pbar = tqdm(total=size, leave=False, unit='B', unit_scale=True, desc=file_name)
        for i in range(0, size, chunk_size):
            chunk_data = data[i:i + chunk_size]
            pbar.update(len(chunk_data))
            r = requests.put(url, headers={
                'Content-Length': str(len(chunk_data)),
                'Content-Range': 'bytes {}-{}/{}'.format(i, i + len(chunk_data) - 1, size)
            }, data=chunk_data)
            if r.status_code not in [200, 201, 202]:
                print("上传出错")
                break

    def check_file_exists(self, file_name):
        # 构造搜索 API 的 URL
        base_url = "https://graph.microsoft.com/v1.0/me/drive/root"
        search_url = f"{base_url}:/{file_name}:/"

        try:
            response = requests.get(search_url, headers=self.header)
        except:
            return True
        # print(response.text)

        if response.status_code == 200:
            # 文件存在
            return True
        elif response.status_code == 404:
            # 文件不存在
            return False
        else:
            # 其他错误处理
            return True

def zip_dir(directory, zip_filename):
    # 创建ZIP文件
    with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
        # 遍历目录及其所有子目录
        for root, dirs, files in os.walk(directory):
            for file in files:
                # 获取文件的绝对路径
                file_path = os.path.join(root, file)
                # 获取文件在ZIP文件中的相对路径
                arcname = os.path.relpath(file_path, os.path.join(directory, os.pardir))
                # 将文件添加到ZIP文件中
                zipf.write(file_path, arcname)
    return os.path.join(directory, zip_filename)

client_id = '应用程序(客户端) ID'
client_secret = '客户端密码'
tenant_id = '目录(租户) ID'
© 版权声明
THE END
喜欢就支持一下吧
点赞8 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容