You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
153 lines
4.8 KiB
153 lines
4.8 KiB
import json
|
|
import time,datetime
|
|
import requests as rq
|
|
import os
|
|
|
|
base_url = "https://v2.doc2x.noedgeai.com"
|
|
key_file="E:/华为云盘/doc2x_key.txt"
|
|
if not os.path.exists(key_file):
|
|
key_file="D:/华为云盘/doc2x_key.txt"
|
|
with open(key_file, "r") as f:
|
|
secret = f.read().strip()
|
|
|
|
def preupload():
|
|
url = f"{base_url}/api/v2/parse/preupload"
|
|
headers = {
|
|
"Authorization": f"Bearer {secret}"
|
|
}
|
|
res = rq.post(url, headers=headers)
|
|
if res.status_code == 200:
|
|
data = res.json()
|
|
if data["code"] == "success":
|
|
return data["data"]
|
|
else:
|
|
raise Exception(f"get preupload url failed: {data}")
|
|
else:
|
|
raise Exception(f"get preupload url failed: {res.text}")
|
|
|
|
def put_file(pdf_path: str, url: str):
|
|
with open(pdf_path, "rb") as f:
|
|
res = rq.put(url, data=f) # body为文件二进制流
|
|
if res.status_code != 200:
|
|
raise Exception(f"put file failed: {res.text}")
|
|
|
|
def get_status(uid: str):
|
|
url = f"{base_url}/api/v2/parse/status?uid={uid}"
|
|
headers = {
|
|
"Authorization": f"Bearer {secret}"
|
|
}
|
|
res = rq.get(url, headers=headers)
|
|
if res.status_code == 200:
|
|
data = res.json()
|
|
if data["code"] == "success":
|
|
return data["data"]
|
|
else:
|
|
raise Exception(f"get status failed: {data}")
|
|
else:
|
|
raise Exception(f"get status failed: {res.text}")
|
|
|
|
def parse_result(uid: str, download_path: str, to: str):
|
|
url = "https://v2.doc2x.noedgeai.com/api/v2/convert/parse"
|
|
headers = {
|
|
"Authorization": f"Bearer {secret}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
data = {
|
|
"uid": uid,
|
|
"to": to,
|
|
"formula_mode": "normal",
|
|
"filename": "output."+to,
|
|
}
|
|
|
|
response = rq.post(url, headers=headers, data=json.dumps(data))
|
|
print(response.text)
|
|
|
|
url_result='https://v2.doc2x.noedgeai.com/api/v2/convert/parse/result?uid='+uid
|
|
headers = {"Authorization": f"Bearer {secret}"}
|
|
|
|
while True:
|
|
result_status = rq.get(url_result, headers=headers)
|
|
if result_status.status_code != 200:
|
|
raise Exception(f"get result failed: {result_status.text}")
|
|
else:
|
|
print(result_status.text)
|
|
if result_status.json()['data']['status'] == 'success':
|
|
download_url = result_status.json()['data']['url']
|
|
break
|
|
elif result_status.json()['data']['status'] == 'processing':
|
|
time.sleep(3)
|
|
else:
|
|
raise Exception(f"get result failed: {result_status.text}")
|
|
downloag_res = rq.get(download_url)
|
|
dt=datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
if to=='docx':
|
|
kzm='docx'
|
|
else:
|
|
kzm='zip'
|
|
|
|
try:
|
|
if not os.path.exists(download_path):
|
|
os.makedirs(download_path)
|
|
if download_path.find('"')!=-1 or download_path.find("'")!=-1:
|
|
download_path=download_path[1:-1]
|
|
download_path=download_path.replace('\\','/')
|
|
if download_path[-1]!='/':
|
|
download_path=download_path+'/'
|
|
|
|
with open(download_path+'result'+dt+'.'+kzm, 'wb') as f:
|
|
f.write(downloag_res.content)
|
|
os.remove('result.json')
|
|
except Exception as e:
|
|
print(e)
|
|
print('将文件保存在当前目录')
|
|
with open('result'+dt+'.'+kzm, 'wb') as f:
|
|
f.write(downloag_res.content)
|
|
os.remove('result.json')
|
|
|
|
def process_pdf(pdf_path: str, download_path: str, to: str="docx"):
|
|
upload_data = preupload()
|
|
print(upload_data)
|
|
url = upload_data["url"]
|
|
uid = upload_data["uid"]
|
|
print(f"uid: {uid}")
|
|
|
|
if pdf_path.find('"')!=-1 or pdf_path.find("'")!=-1:
|
|
pdf_path=pdf_path[1:-1]
|
|
|
|
if os.path.exists(pdf_path):
|
|
if not pdf_path.endswith(".pdf"):
|
|
raise Exception("file type not supported")
|
|
else:
|
|
put_file(pdf_path, url)
|
|
else:
|
|
raise Exception("file not exists")
|
|
|
|
while True:
|
|
status_data = get_status(uid)
|
|
print(status_data)
|
|
if status_data["status"] == "success":
|
|
result = status_data["result"]
|
|
with open("result.json", "w") as f:
|
|
json.dump(result, f)
|
|
break
|
|
elif status_data["status"] == "failed":
|
|
detail = status_data["detail"]
|
|
raise Exception(f"parse failed: {detail}")
|
|
elif status_data["status"] == "processing":
|
|
# processing
|
|
progress = status_data["progress"]
|
|
print(f"progress: {progress}")
|
|
time.sleep(3)
|
|
|
|
parse_result(uid, download_path, to)
|
|
|
|
if __name__ == "__main__":
|
|
# pdf_path = input("pdf path: ")
|
|
# download_path = input("download path: ")
|
|
# process_pdf(pdf_path, download_path)
|
|
uid="0194413e-d82e-707c-b3ba-dd87e94a1d7f"
|
|
parse_result(uid,'E:/','docx')
|
|
|
|
|
|
|
|
|