Browse Source

finish

等支持翻译再优化吧
master
cgl 2 months ago
parent
commit
932410f619
  1. 153
      core.py

153
core.py

@ -0,0 +1,153 @@
import json
import time,datetime
import requests as rq
import os
base_url = "https://v2.doc2x.noedgeai.com"
key_file="E:/华为云盘/doc2x_key.txt"
if not os.path.exists(key_file):
key_file="D:/华为云盘/doc2x_key.txt"
with open(key_file, "r") as f:
secret = f.read().strip()
def preupload():
url = f"{base_url}/api/v2/parse/preupload"
headers = {
"Authorization": f"Bearer {secret}"
}
res = rq.post(url, headers=headers)
if res.status_code == 200:
data = res.json()
if data["code"] == "success":
return data["data"]
else:
raise Exception(f"get preupload url failed: {data}")
else:
raise Exception(f"get preupload url failed: {res.text}")
def put_file(pdf_path: str, url: str):
with open(pdf_path, "rb") as f:
res = rq.put(url, data=f) # body为文件二进制流
if res.status_code != 200:
raise Exception(f"put file failed: {res.text}")
def get_status(uid: str):
url = f"{base_url}/api/v2/parse/status?uid={uid}"
headers = {
"Authorization": f"Bearer {secret}"
}
res = rq.get(url, headers=headers)
if res.status_code == 200:
data = res.json()
if data["code"] == "success":
return data["data"]
else:
raise Exception(f"get status failed: {data}")
else:
raise Exception(f"get status failed: {res.text}")
def parse_result(uid: str, download_path: str, to: str):
url = "https://v2.doc2x.noedgeai.com/api/v2/convert/parse"
headers = {
"Authorization": f"Bearer {secret}",
"Content-Type": "application/json",
}
data = {
"uid": uid,
"to": to,
"formula_mode": "normal",
"filename": "output."+to,
}
response = rq.post(url, headers=headers, data=json.dumps(data))
print(response.text)
url_result='https://v2.doc2x.noedgeai.com/api/v2/convert/parse/result?uid='+uid
headers = {"Authorization": f"Bearer {secret}"}
while True:
result_status = rq.get(url_result, headers=headers)
if result_status.status_code != 200:
raise Exception(f"get result failed: {result_status.text}")
else:
print(result_status.text)
if result_status.json()['data']['status'] == 'success':
download_url = result_status.json()['data']['url']
break
elif result_status.json()['data']['status'] == 'processing':
time.sleep(3)
else:
raise Exception(f"get result failed: {result_status.text}")
downloag_res = rq.get(download_url)
dt=datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
if to=='docx':
kzm='docx'
else:
kzm='zip'
try:
if not os.path.exists(download_path):
os.makedirs(download_path)
if download_path.find('"')!=-1 or download_path.find("'")!=-1:
download_path=download_path[1:-1]
download_path=download_path.replace('\\','/')
if download_path[-1]!='/':
download_path=download_path+'/'
with open(download_path+'result'+dt+'.'+kzm, 'wb') as f:
f.write(downloag_res.content)
os.remove('result.json')
except Exception as e:
print(e)
print('将文件保存在当前目录')
with open('result'+dt+'.'+kzm, 'wb') as f:
f.write(downloag_res.content)
os.remove('result.json')
def process_pdf(pdf_path: str, download_path: str, to: str="docx"):
upload_data = preupload()
print(upload_data)
url = upload_data["url"]
uid = upload_data["uid"]
print(f"uid: {uid}")
if pdf_path.find('"')!=-1 or pdf_path.find("'")!=-1:
pdf_path=pdf_path[1:-1]
if os.path.exists(pdf_path):
if not pdf_path.endswith(".pdf"):
raise Exception("file type not supported")
else:
put_file(pdf_path, url)
else:
raise Exception("file not exists")
while True:
status_data = get_status(uid)
print(status_data)
if status_data["status"] == "success":
result = status_data["result"]
with open("result.json", "w") as f:
json.dump(result, f)
break
elif status_data["status"] == "failed":
detail = status_data["detail"]
raise Exception(f"parse failed: {detail}")
elif status_data["status"] == "processing":
# processing
progress = status_data["progress"]
print(f"progress: {progress}")
time.sleep(3)
parse_result(uid, download_path, to)
if __name__ == "__main__":
# pdf_path = input("pdf path: ")
# download_path = input("download path: ")
# process_pdf(pdf_path, download_path)
uid="0194413e-d82e-707c-b3ba-dd87e94a1d7f"
parse_result(uid,'E:/','docx')
Loading…
Cancel
Save