solve task execution issues (#90)

This commit is contained in:
KevinHuSh 2024-03-01 19:48:01 +08:00 committed by GitHub
parent 7f174fb9d3
commit 8a726fb04b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 89 additions and 87 deletions

View File

@ -236,13 +236,16 @@ def run():
try: try:
for id in req["doc_ids"]: for id in req["doc_ids"]:
info = {"run": str(req["run"]), "progress": 0} info = {"run": str(req["run"]), "progress": 0}
if str(req["run"]) == TaskStatus.RUNNING.value:info["progress_msg"] = "" if str(req["run"]) == TaskStatus.RUNNING.value:
info["progress_msg"] = ""
info["chunk_num"] = 0
info["token_num"] = 0
DocumentService.update_by_id(id, info) DocumentService.update_by_id(id, info)
if str(req["run"]) == TaskStatus.CANCEL.value: #if str(req["run"]) == TaskStatus.CANCEL.value:
tenant_id = DocumentService.get_tenant_id(id) tenant_id = DocumentService.get_tenant_id(id)
if not tenant_id: if not tenant_id:
return get_data_error_result(retmsg="Tenant not found!") return get_data_error_result(retmsg="Tenant not found!")
ELASTICSEARCH.deleteByQuery(Q("match", doc_id=id), idxnm=search.index_name(tenant_id)) ELASTICSEARCH.deleteByQuery(Q("match", doc_id=id), idxnm=search.index_name(tenant_id))
return get_json_result(data=True) return get_json_result(data=True)
except Exception as e: except Exception as e:
@ -311,13 +314,17 @@ def change_parser():
if doc.type == FileType.VISUAL or re.search(r"\.(ppt|pptx|pages)$", doc.name): if doc.type == FileType.VISUAL or re.search(r"\.(ppt|pptx|pages)$", doc.name):
return get_data_error_result(retmsg="Not supported yet!") return get_data_error_result(retmsg="Not supported yet!")
e = DocumentService.update_by_id(doc.id, {"parser_id": req["parser_id"], "progress":0, "progress_msg": ""}) e = DocumentService.update_by_id(doc.id, {"parser_id": req["parser_id"], "progress":0, "progress_msg": "", "run": "0"})
if not e: if not e:
return get_data_error_result(retmsg="Document not found!") return get_data_error_result(retmsg="Document not found!")
if doc.token_num>0: if doc.token_num>0:
e = DocumentService.increment_chunk_num(doc.id, doc.kb_id, doc.token_num*-1, doc.chunk_num*-1, doc.process_duation*-1) e = DocumentService.increment_chunk_num(doc.id, doc.kb_id, doc.token_num*-1, doc.chunk_num*-1, doc.process_duation*-1)
if not e: if not e:
return get_data_error_result(retmsg="Document not found!") return get_data_error_result(retmsg="Document not found!")
tenant_id = DocumentService.get_tenant_id(req["doc_id"])
if not tenant_id:
return get_data_error_result(retmsg="Tenant not found!")
ELASTICSEARCH.deleteByQuery(Q("match", doc_id=doc.id), idxnm=search.index_name(tenant_id))
return get_json_result(data=True) return get_json_result(data=True)
except Exception as e: except Exception as e:

View File

@ -65,7 +65,7 @@ class TaskService(CommonService):
try: try:
task = cls.model.get_by_id(id) task = cls.model.get_by_id(id)
_, doc = DocumentService.get_by_id(task.doc_id) _, doc = DocumentService.get_by_id(task.doc_id)
return doc.run == TaskStatus.CANCEL.value return doc.run == TaskStatus.CANCEL.value or doc.progress < 0
except Exception as e: except Exception as e:
pass pass
return True return True

View File

@ -98,15 +98,6 @@ PROXY_PROTOCOL = get_base_config(RAG_FLOW_SERVICE_NAME, {}).get("protocol")
DATABASE = decrypt_database_config(name="mysql") DATABASE = decrypt_database_config(name="mysql")
# Logger
LoggerFactory.set_directory(os.path.join(get_project_base_directory(), "logs", "api"))
# {CRITICAL: 50, FATAL:50, ERROR:40, WARNING:30, WARN:30, INFO:20, DEBUG:10, NOTSET:0}
LoggerFactory.LEVEL = 10
stat_logger = getLogger("stat")
access_logger = getLogger("access")
database_logger = getLogger("database")
# Switch # Switch
# upload # upload
UPLOAD_DATA_FROM_CLIENT = True UPLOAD_DATA_FROM_CLIENT = True
@ -144,6 +135,15 @@ CHECK_NODES_IDENTITY = False
retrievaler = search.Dealer(ELASTICSEARCH) retrievaler = search.Dealer(ELASTICSEARCH)
# Logger
LoggerFactory.set_directory(os.path.join(get_project_base_directory(), "logs", "api"))
# {CRITICAL: 50, FATAL:50, ERROR:40, WARNING:30, WARN:30, INFO:20, DEBUG:10, NOTSET:0}
LoggerFactory.LEVEL = 10
stat_logger = getLogger("stat")
access_logger = getLogger("access")
database_logger = getLogger("database")
class CustomEnum(Enum): class CustomEnum(Enum):
@classmethod @classmethod
def valid(cls, value): def valid(cls, value):

View File

@ -8,7 +8,7 @@ import torch
import re import re
import pdfplumber import pdfplumber
import logging import logging
from PIL import Image from PIL import Image, ImageDraw
import numpy as np import numpy as np
from api.db import ParserType from api.db import ParserType
@ -930,13 +930,25 @@ class HuParser:
def crop(self, text, ZM=3): def crop(self, text, ZM=3):
imgs = [] imgs = []
poss = []
for tag in re.findall(r"@@[0-9-]+\t[0-9.\t]+##", text): for tag in re.findall(r"@@[0-9-]+\t[0-9.\t]+##", text):
pn, left, right, top, bottom = tag.strip( pn, left, right, top, bottom = tag.strip(
"#").strip("@").split("\t") "#").strip("@").split("\t")
left, right, top, bottom = float(left), float( left, right, top, bottom = float(left), float(
right), float(top), float(bottom) right), float(top), float(bottom)
poss.append(([int(p) - 1 for p in pn.split("-")], left, right, top, bottom))
if not poss: return
max_width = np.max([right-left for (_, left, right, _, _) in poss])
GAP = 6
pos = poss[0]
poss.insert(0, ([pos[0][0]], pos[1], pos[2], max(0, pos[3]-120), max(pos[3]-GAP, 0)))
pos = poss[-1]
poss.append(([pos[0][-1]], pos[1], pos[2], min(self.page_images[pos[0][-1]].size[1]/ZM, pos[4]+GAP), min(self.page_images[pos[0][-1]].size[1]/ZM, pos[4]+120)))
for ii, (pns, left, right, top, bottom) in enumerate(poss):
right = left + max_width
bottom *= ZM bottom *= ZM
pns = [int(p) - 1 for p in pn.split("-")]
for pn in pns[1:]: for pn in pns[1:]:
bottom += self.page_images[pn - 1].size[1] bottom += self.page_images[pn - 1].size[1]
imgs.append( imgs.append(
@ -959,16 +971,21 @@ class HuParser:
if not imgs: if not imgs:
return return
GAP = 2
height = 0 height = 0
for img in imgs: for img in imgs:
height += img.size[1] + GAP height += img.size[1] + GAP
height = int(height) height = int(height)
width = int(np.max([i.size[0] for i in imgs]))
pic = Image.new("RGB", pic = Image.new("RGB",
(int(np.max([i.size[0] for i in imgs])), height), (width, height),
(245, 245, 245)) (245, 245, 245))
height = 0 height = 0
for img in imgs: for ii, img in enumerate(imgs):
if ii == 0 or ii + 1 == len(imgs):
img = img.convert('RGBA')
overlay = Image.new('RGBA', img.size, (0, 0, 0, 0))
overlay.putalpha(128)
img = Image.alpha_composite(img, overlay).convert("RGB")
pic.paste(img, (0, int(height))) pic.paste(img, (0, int(height)))
height += img.size[1] + GAP height += img.size[1] + GAP
return pic return pic

View File

@ -34,7 +34,7 @@ class LayoutRecognizer(Recognizer):
"Equation", "Equation",
] ]
def __init__(self, domain): def __init__(self, domain):
super().__init__(self.labels, domain) #, os.path.join(get_project_base_directory(), "rag/res/deepdoc/")) super().__init__(self.labels, domain, os.path.join(get_project_base_directory(), "rag/res/deepdoc/"))
def __call__(self, image_list, ocr_res, scale_factor=3, thr=0.2, batch_size=16): def __call__(self, image_list, ocr_res, scale_factor=3, thr=0.2, batch_size=16):
def __is_garbage(b): def __is_garbage(b):

View File

@ -33,7 +33,7 @@ class TableStructureRecognizer(Recognizer):
] ]
def __init__(self): def __init__(self):
super().__init__(self.labels, "tsr")#,os.path.join(get_project_base_directory(), "rag/res/deepdoc/")) super().__init__(self.labels, "tsr",os.path.join(get_project_base_directory(), "rag/res/deepdoc/"))
def __call__(self, images, thr=0.2): def __call__(self, images, thr=0.2):
tbls = super().__call__(images, thr) tbls = super().__call__(images, thr)

View File

@ -13,7 +13,7 @@
import copy import copy
import re import re
from rag.nlp import bullets_category, is_english, tokenize, remove_contents_table, \ from rag.nlp import bullets_category, is_english, tokenize, remove_contents_table, \
hierarchical_merge, make_colon_as_title, naive_merge, random_choices hierarchical_merge, make_colon_as_title, naive_merge, random_choices, tokenize_table
from rag.nlp import huqie from rag.nlp import huqie
from deepdoc.parser import PdfParser, DocxParser from deepdoc.parser import PdfParser, DocxParser
@ -90,25 +90,16 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca
make_colon_as_title(sections) make_colon_as_title(sections)
bull = bullets_category([t for t in random_choices([t for t,_ in sections], k=100)]) bull = bullets_category([t for t in random_choices([t for t,_ in sections], k=100)])
if bull >= 0: cks = hierarchical_merge(bull, sections, 3) if bull >= 0: cks = hierarchical_merge(bull, sections, 3)
else: cks = naive_merge(sections, kwargs.get("chunk_token_num", 256), kwargs.get("delimer", "\n。;!?")) else:
sections = [s.split("@") for s in sections]
sections = [(pr[0], "@"+pr[1]) for pr in sections if len(pr)==2]
cks = naive_merge(sections, kwargs.get("chunk_token_num", 256), kwargs.get("delimer", "\n。;!?"))
sections = [t for t, _ in sections]
# is it English # is it English
eng = lang.lower() == "english"#is_english(random_choices(sections, k=218)) eng = lang.lower() == "english"#is_english(random_choices([t for t, _ in sections], k=218))
res = tokenize_table(tbls, doc, eng)
res = []
# add tables
for img, rows in tbls:
bs = 10
de = ";" if eng else ""
for i in range(0, len(rows), bs):
d = copy.deepcopy(doc)
r = de.join(rows[i:i + bs])
r = re.sub(r"\t——(来自| in ).*”%s" % de, "", r)
tokenize(d, r, eng)
d["image"] = img
res.append(d)
print("TABLE", d["content_with_weight"])
# wrap up to es documents # wrap up to es documents
for ck in cks: for ck in cks:
d = copy.deepcopy(doc) d = copy.deepcopy(doc)

View File

@ -2,7 +2,7 @@ import copy
import re import re
from api.db import ParserType from api.db import ParserType
from rag.nlp import huqie, tokenize from rag.nlp import huqie, tokenize, tokenize_table
from deepdoc.parser import PdfParser from deepdoc.parser import PdfParser
from rag.utils import num_tokens_from_string from rag.utils import num_tokens_from_string
@ -81,18 +81,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca
# is it English # is it English
eng = lang.lower() == "english"#pdf_parser.is_english eng = lang.lower() == "english"#pdf_parser.is_english
res = [] res = tokenize_table(tbls, doc, eng)
# add tables
for img, rows in tbls:
bs = 10
de = ";" if eng else ""
for i in range(0, len(rows), bs):
d = copy.deepcopy(doc)
r = de.join(rows[i:i + bs])
r = re.sub(r"\t——(来自| in ).*”%s" % de, "", r)
tokenize(d, r, eng)
d["image"] = img
res.append(d)
i = 0 i = 0
chunk = [] chunk = []

View File

@ -13,7 +13,7 @@
import copy import copy
import re import re
from rag.app import laws from rag.app import laws
from rag.nlp import huqie, is_english, tokenize, naive_merge from rag.nlp import huqie, is_english, tokenize, naive_merge, tokenize_table
from deepdoc.parser import PdfParser from deepdoc.parser import PdfParser
from rag.settings import cron_logger from rag.settings import cron_logger
@ -72,17 +72,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca
pdf_parser = Pdf() pdf_parser = Pdf()
sections, tbls = pdf_parser(filename if not binary else binary, sections, tbls = pdf_parser(filename if not binary else binary,
from_page=from_page, to_page=to_page, callback=callback) from_page=from_page, to_page=to_page, callback=callback)
# add tables res = tokenize_table(tbls, doc, eng)
for img, rows in tbls:
bs = 10
de = ";" if eng else ""
for i in range(0, len(rows), bs):
d = copy.deepcopy(doc)
r = de.join(rows[i:i + bs])
r = re.sub(r"\t——(来自| in ).*”%s" % de, "", r)
tokenize(d, r, eng)
d["image"] = img
res.append(d)
elif re.search(r"\.txt$", filename, re.IGNORECASE): elif re.search(r"\.txt$", filename, re.IGNORECASE):
callback(0.1, "Start to parse.") callback(0.1, "Start to parse.")
txt = "" txt = ""
@ -106,6 +96,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca
# wrap up to es documents # wrap up to es documents
for ck in cks: for ck in cks:
print("--", ck) print("--", ck)
if not ck:continue
d = copy.deepcopy(doc) d = copy.deepcopy(doc)
if pdf_parser: if pdf_parser:
d["image"] = pdf_parser.crop(ck) d["image"] = pdf_parser.crop(ck)

View File

@ -15,7 +15,7 @@ import re
from collections import Counter from collections import Counter
from api.db import ParserType from api.db import ParserType
from rag.nlp import huqie, tokenize from rag.nlp import huqie, tokenize, tokenize_table
from deepdoc.parser import PdfParser from deepdoc.parser import PdfParser
import numpy as np import numpy as np
from rag.utils import num_tokens_from_string from rag.utils import num_tokens_from_string
@ -158,18 +158,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca
eng = lang.lower() == "english"#pdf_parser.is_english eng = lang.lower() == "english"#pdf_parser.is_english
print("It's English.....", eng) print("It's English.....", eng)
res = [] res = tokenize_table(paper["tables"], doc, eng)
# add tables
for img, rows in paper["tables"]:
bs = 10
de = ";" if eng else ""
for i in range(0, len(rows), bs):
d = copy.deepcopy(doc)
r = de.join(rows[i:i + bs])
r = re.sub(r"\t——(来自| in ).*”%s" % de, "", r)
tokenize(d, r)
d["image"] = img
res.append(d)
if paper["abstract"]: if paper["abstract"]:
d = copy.deepcopy(doc) d = copy.deepcopy(doc)

View File

@ -20,7 +20,7 @@ from deepdoc.parser import PdfParser, PptParser
class Ppt(PptParser): class Ppt(PptParser):
def __call__(self, fnm, from_page, to_page, callback=None): def __call__(self, fnm, from_page, to_page, callback=None):
txts = super.__call__(fnm, from_page, to_page) txts = super().__call__(fnm, from_page, to_page)
callback(0.5, "Text extraction finished.") callback(0.5, "Text extraction finished.")
import aspose.slides as slides import aspose.slides as slides

View File

@ -79,7 +79,7 @@ def chunk(filename, binary=None, callback=None, **kwargs):
resume = remote_call(filename, binary) resume = remote_call(filename, binary)
if len(resume.keys()) < 7: if len(resume.keys()) < 7:
callback(-1, "Resume is not successfully parsed.") callback(-1, "Resume is not successfully parsed.")
return [] raise Exception("Resume parser remote call fail!")
callback(0.6, "Done parsing. Chunking...") callback(0.6, "Done parsing. Chunking...")
print(json.dumps(resume, ensure_ascii=False, indent=2)) print(json.dumps(resume, ensure_ascii=False, indent=2))

View File

@ -1,4 +1,4 @@
import copy
from nltk.stem import PorterStemmer from nltk.stem import PorterStemmer
stemmer = PorterStemmer() stemmer = PorterStemmer()
@ -80,6 +80,20 @@ def tokenize(d, t, eng):
d["content_sm_ltks"] = huqie.qieqie(d["content_ltks"]) d["content_sm_ltks"] = huqie.qieqie(d["content_ltks"])
def tokenize_table(tbls, doc, eng, batch_size=10):
res = []
# add tables
for img, rows in tbls:
de = "; " if eng else " "
for i in range(0, len(rows), batch_size):
d = copy.deepcopy(doc)
r = de.join(rows[i:i + batch_size])
tokenize(d, r, eng)
d["image"] = img
res.append(d)
return res
def remove_contents_table(sections, eng=False): def remove_contents_table(sections, eng=False):
i = 0 i = 0
while i < len(sections): while i < len(sections):
@ -201,10 +215,12 @@ def naive_merge(sections, chunk_token_num=128, delimiter="\n。"):
tnum = num_tokens_from_string(t) tnum = num_tokens_from_string(t)
if tnum < 8: pos = "" if tnum < 8: pos = ""
if tk_nums[-1] > chunk_token_num: if tk_nums[-1] > chunk_token_num:
cks.append(t + pos) if t.find(pos) < 0: t += pos
cks.append(t)
tk_nums.append(tnum) tk_nums.append(tnum)
else: else:
cks[-1] += t + pos if cks[-1].find(pos) < 0: t += pos
cks[-1] += t
tk_nums[-1] += tnum tk_nums[-1] += tnum
for sec, pos in sections: for sec, pos in sections:

View File

@ -1,6 +1,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import json import json
import re import re
from copy import deepcopy
from elasticsearch_dsl import Q, Search from elasticsearch_dsl import Q, Search
from typing import List, Optional, Dict, Union from typing import List, Optional, Dict, Union
from dataclasses import dataclass from dataclasses import dataclass
@ -98,7 +100,7 @@ class Dealer:
del s["highlight"] del s["highlight"]
q_vec = s["knn"]["query_vector"] q_vec = s["knn"]["query_vector"]
es_logger.info("【Q】: {}".format(json.dumps(s))) es_logger.info("【Q】: {}".format(json.dumps(s)))
res = self.es.search(s, idxnm=idxnm, timeout="600s", src=src) res = self.es.search(deepcopy(s), idxnm=idxnm, timeout="600s", src=src)
es_logger.info("TOTAL: {}".format(self.es.getTotal(res))) es_logger.info("TOTAL: {}".format(self.es.getTotal(res)))
if self.es.getTotal(res) == 0 and "knn" in s: if self.es.getTotal(res) == 0 and "knn" in s:
bqry, _ = self.qryr.question(qst, min_match="10%") bqry, _ = self.qryr.question(qst, min_match="10%")

View File

@ -90,7 +90,7 @@ def dispatch():
tsks.append(task) tsks.append(task)
else: else:
tsks.append(new_task()) tsks.append(new_task())
print(tsks)
bulk_insert_into_db(Task, tsks, True) bulk_insert_into_db(Task, tsks, True)
set_dispatching(r["id"]) set_dispatching(r["id"])
tmf.write(str(r["update_time"]) + "\n") tmf.write(str(r["update_time"]) + "\n")

View File

@ -114,7 +114,7 @@ def build(row):
kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"]) kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"])
except Exception as e: except Exception as e:
if re.search("(No such file|not found)", str(e)): if re.search("(No such file|not found)", str(e)):
callback(-1, "Can not find file <%s>" % row["doc_name"]) callback(-1, "Can not find file <%s>" % row["name"])
else: else:
callback(-1, f"Internal server error: %s" % callback(-1, f"Internal server error: %s" %
str(e).replace("'", "")) str(e).replace("'", ""))