import json import time,datetime import requests as rq import os base_url = "https://v2.doc2x.noedgeai.com" key_file="E:/华为云盘/doc2x_key.txt" if not os.path.exists(key_file): key_file="D:/华为云盘/doc2x_key.txt" with open(key_file, "r") as f: secret = f.read().strip() def preupload(): url = f"{base_url}/api/v2/parse/preupload" headers = { "Authorization": f"Bearer {secret}" } res = rq.post(url, headers=headers) if res.status_code == 200: data = res.json() if data["code"] == "success": return data["data"] else: raise Exception(f"get preupload url failed: {data}") else: raise Exception(f"get preupload url failed: {res.text}") def put_file(pdf_path: str, url: str): with open(pdf_path, "rb") as f: res = rq.put(url, data=f) # body为文件二进制流 if res.status_code != 200: raise Exception(f"put file failed: {res.text}") def get_status(uid: str): url = f"{base_url}/api/v2/parse/status?uid={uid}" headers = { "Authorization": f"Bearer {secret}" } res = rq.get(url, headers=headers) if res.status_code == 200: data = res.json() if data["code"] == "success": return data["data"] else: raise Exception(f"get status failed: {data}") else: raise Exception(f"get status failed: {res.text}") def parse_result(uid: str, download_path: str, to: str): url = "https://v2.doc2x.noedgeai.com/api/v2/convert/parse" headers = { "Authorization": f"Bearer {secret}", "Content-Type": "application/json", } data = { "uid": uid, "to": to, "formula_mode": "normal", "filename": "output."+to, } response = rq.post(url, headers=headers, data=json.dumps(data)) print(response.text) url_result='https://v2.doc2x.noedgeai.com/api/v2/convert/parse/result?uid='+uid headers = {"Authorization": f"Bearer {secret}"} while True: result_status = rq.get(url_result, headers=headers) if result_status.status_code != 200: raise Exception(f"get result failed: {result_status.text}") else: print(result_status.text) if result_status.json()['data']['status'] == 'success': download_url = result_status.json()['data']['url'] break elif result_status.json()['data']['status'] == 'processing': time.sleep(3) else: raise Exception(f"get result failed: {result_status.text}") downloag_res = rq.get(download_url) dt=datetime.datetime.now().strftime('%Y%m%d_%H%M%S') if to=='docx': kzm='docx' else: kzm='zip' try: if not os.path.exists(download_path): os.makedirs(download_path) if download_path.find('"')!=-1 or download_path.find("'")!=-1: download_path=download_path[1:-1] download_path=download_path.replace('\\','/') if download_path[-1]!='/': download_path=download_path+'/' with open(download_path+'result'+dt+'.'+kzm, 'wb') as f: f.write(downloag_res.content) os.remove('result.json') except Exception as e: print(e) print('将文件保存在当前目录') with open('result'+dt+'.'+kzm, 'wb') as f: f.write(downloag_res.content) os.remove('result.json') def process_pdf(pdf_path: str, download_path: str, to: str="docx"): upload_data = preupload() print(upload_data) url = upload_data["url"] uid = upload_data["uid"] print(f"uid: {uid}") if pdf_path.find('"')!=-1 or pdf_path.find("'")!=-1: pdf_path=pdf_path[1:-1] if os.path.exists(pdf_path): if not pdf_path.endswith(".pdf"): raise Exception("file type not supported") else: put_file(pdf_path, url) else: raise Exception("file not exists") while True: status_data = get_status(uid) print(status_data) if status_data["status"] == "success": result = status_data["result"] with open("result.json", "w") as f: json.dump(result, f) break elif status_data["status"] == "failed": detail = status_data["detail"] raise Exception(f"parse failed: {detail}") elif status_data["status"] == "processing": # processing progress = status_data["progress"] print(f"progress: {progress}") time.sleep(3) parse_result(uid, download_path, to) if __name__ == "__main__": # pdf_path = input("pdf path: ") # download_path = input("download path: ") # process_pdf(pdf_path, download_path) uid="0194413e-d82e-707c-b3ba-dd87e94a1d7f" parse_result(uid,'E:/','docx')