commit 424c30853c0e90240b6f01298abd716daeb62a7a Author: Blade He Date: Mon Aug 19 09:52:13 2024 -0500 initial diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cf4b335 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/.env +/log +/utils/__pycache__ +/__pycache__/*.pyc diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0089946 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +Flask==3.0.3 +flasgger==0.9.7.1 +PyMuPDF==1.24.4 +python-dotenv==1.0.1 +boto3==1.34.106 +tqdm==4.66.4 +openai==1.35.10 \ No newline at end of file diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/gpt_utils.py b/utils/gpt_utils.py new file mode 100644 index 0000000..463e11b --- /dev/null +++ b/utils/gpt_utils.py @@ -0,0 +1,121 @@ +# from transformers import GPT2TokenizerFast +import tiktoken +from openai import AzureOpenAI +import openai +import os +from time import sleep +import dotenv +# loads .env file with your OPENAI_API_KEY +dotenv.load_dotenv() + + +def set_environment_variables(engine=os.getenv("Engine_0613_16k")): + if engine.startswith('gpt4') or engine.startswith('gpt-4'): + openai.api_base = os.getenv("OPENAI_API_BASE_DC") + openai.api_key = os.getenv("OPENAI_API_KEY_GPT4") + elif engine.startswith('modc-stg-gpt4'): + openai.api_base = os.getenv("OPENAI_API_BASE_GPT4_MODC") + openai.api_key = os.getenv("OPENAI_API_KEY_GPT4_MODC") + elif engine.upper() == 'ENGINE_GPT4_TURBO': + openai.api_base = os.getenv("OPENAI_API_BASE_GPT4_TURBO") + openai.api_key = os.getenv("OPENAI_API_KEY_GPT4_TURBO") + elif engine.startswith('modc-stg-gpt35turbo16k'): + openai.api_base = os.getenv("OPENAI_API_BASE_GPT3_MODC") + openai.api_key = os.getenv("OPENAI_API_KEY_GPT3_MODC") + else: + openai.api_base = os.getenv("OPENAI_API_BASE") + openai.api_key = os.getenv("OPENAI_API_KEY") + openai.Engine = engine + openai.api_type = os.getenv("OPENAI_API_TYPE") + openai.api_version = os.getenv("OPENAI_API_VERSION") + +# tokenizer = GPT2TokenizerFast.from_pretrained("gpt2") +tokenizer = tiktoken.get_encoding("cl100k_base") + + +def get_embedding(text, engine=os.getenv("EMBEDDING_ENGINE")): + count = 0 + error = '' + while count < 5: + try: + if count > 0: + print(f'retrying the {count} time for getting text embedding...') + return openai.Embedding.create(input=text, engine=engine)['data'][0]['embedding'] + except Exception as e: + error = str(e) + print(error) + count += 1 + sleep(1) + + +def num_tokens_from_string(string: str) -> int: + """Returns the number of tokens in a text string.""" + num_tokens = len(tokenizer.encode(string)) + return num_tokens + + +def num_tokens_from_messages(messages, model="gpt-35-turbo-16k"): + """Returns the number of tokens used by a list of messages.""" + encoding = tiktoken.get_encoding("cl100k_base") + if model == "gpt-35-turbo-16k": + tokens_per_message = 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n + tokens_per_name = -1 # if there's a name, the role is omitted + elif model == "gpt-4-32k": + tokens_per_message = 3 + tokens_per_name = 1 + else: + tokens_per_message = 3 + tokens_per_name = 1 + num_tokens = 0 + for message in messages: + num_tokens += tokens_per_message + for key, value in message.items(): + num_tokens += len(encoding.encode(value)) + if key == "name": + num_tokens += tokens_per_name + num_tokens += 3 # every reply is primed with <|start|>assistant<|message|> + return num_tokens + + +def chat(prompt: str, + engine = os.getenv("Engine_GPT4o"), + azure_endpoint=os.getenv("OPENAI_API_BASE_GPT4o"), + api_key=os.getenv("OPENAI_API_KEY_GPT4o"), + api_version=os.getenv("OPENAI_API_VERSION_GPT4o"), + temperature: float = 0.0): + client = AzureOpenAI( + azure_endpoint=azure_endpoint, + api_key=api_key, + api_version=api_version + ) + + count = 0 + error = '' + max_tokens = 4000 + request_timeout = 120 + while count < 8: + try: + if count > 0: + print(f'retrying the {count} time...') + response = client.chat.completions.create( + model=engine, + temperature=temperature, + max_tokens=max_tokens, + top_p=0.95, + frequency_penalty=0, + presence_penalty=0, + timeout=request_timeout, + stop=None, + messages=[ + {"role": "user", "content": prompt} + ] + ) + return response.choices[0].message.content, False + except Exception as e: + error = str(e) + print(f"error message: {error}") + if 'maximum context length' in error: + return error, True + count += 1 + sleep(3) + return error, True \ No newline at end of file diff --git a/utils/logger.py b/utils/logger.py new file mode 100644 index 0000000..2b3ecf5 --- /dev/null +++ b/utils/logger.py @@ -0,0 +1,44 @@ +import logging +import time +from logging.handlers import TimedRotatingFileHandler +import os + + +class Logger: + def __init__(self): + # log file folder + output_folder = r'./log/' + os.makedirs(output_folder, exist_ok=True) + # add self._log_filename to be ar_yyyyMMddHHmm.log + self._log_filename = os.path.join(output_folder, 'ar_{}.log'.format(time.strftime("%Y%m%d%H%M%S", time.localtime()))) + logging.basicConfig() + # log format + self._formatter = logging.Formatter('%(asctime)s - %(process)d - %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + self._logger = logging.getLogger() + # self.set_console_logger() + self.set_file_logger() + self._logger.setLevel(logging.INFO) + + def set_console_logger(self): + console_handler = logging.StreamHandler() + console_handler.setFormatter(self._formatter) + console_handler.setLevel(logging.INFO) + self._logger.addHandler(console_handler) + + def set_file_logger(self): + log_file_handler = TimedRotatingFileHandler(filename=self._log_filename, + when="D", + interval=1, + backupCount=3, + encoding='utf-8') + log_file_handler.setFormatter(self._formatter) + log_file_handler.setLevel(logging.INFO) + # log_file_handler.suffix = "%Y%m%d_%H%M%S.log" + self._logger.addHandler(log_file_handler) + + def get_logger(self): + return self._logger + + +logger = Logger().get_logger() diff --git a/utils/pdf_download.py b/utils/pdf_download.py new file mode 100644 index 0000000..b47027a --- /dev/null +++ b/utils/pdf_download.py @@ -0,0 +1,56 @@ +import boto3 +import time +import os +from utils.logger import logger +import dotenv +# loads .env file with your OPENAI_API_KEY +dotenv.load_dotenv() + +def try_ntimes(func, params, success_msg='', error_msg='', error_res=False, ntimes=1, interval=1): + count = 1 + while True: + try: + res = func(**params) + print(success_msg) + return res + except Exception as e: + if count == ntimes: + print(error_msg) + return error_res + print(f'Please Set AWS environment variables at first, error: {e}') + print("Having tried {} times and trying one more time...".format(count)) + time.sleep(interval) + count += 1 + +def download_pdf_from_documents_warehouse(pdf_directory: str, doc_id: str): + if pdf_directory is None or pdf_directory == "": + logger.error("pdf_directory is not provided") + return None + os.makedirs(pdf_directory, exist_ok=True) + + pdf_file_path = os.path.join(pdf_directory, f"{doc_id}.pdf") + + if os.path.exists(pdf_file_path): + logger.info(f"PDF file for {os.path.basename(pdf_file_path)} already exists. Skipping...") + return pdf_file_path + else: + ACCESS_KEY = os.getenv('ACCESS_KEY') + SECRET_KEY = os.getenv('SECRET_KEY') + + session = boto3.Session(aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY) + s3 = session.client('s3') + + # s3 = boto3.client('s3') + bucket_name = os.getenv('BUCKET_NAME') + + params = {'Bucket': bucket_name, 'Key': doc_id, 'Filename': pdf_file_path} + success_msg = f'file downloaded from S3 successfully: {doc_id}' + error_msg = f'failed to download file {doc_id} from S3' + error_res = '__process_failed__' + + res = try_ntimes(func=s3.download_file, params=params, + success_msg=success_msg, + error_msg=error_msg, error_res=error_res, + ntimes=3, interval=5) + + return pdf_file_path diff --git a/utils/pdf_util.py b/utils/pdf_util.py new file mode 100644 index 0000000..ab11998 --- /dev/null +++ b/utils/pdf_util.py @@ -0,0 +1,1190 @@ +# Import Libraries +from typing import Tuple +from io import BytesIO +import os +import argparse +import re +import fitz +import json +from traceback import print_exc +from tqdm import tqdm +from utils.similarity import Similarity + +from utils.logger import logger + + +class PDFUtil: + def __init__(self, pdf_file: str) -> None: + self.pdf_file = pdf_file + self.simple_pdf_file = os.path.basename(self.pdf_file) + self.is_valid_path() + self.similarity = Similarity() + + def is_valid_path(self): + """ + Validates the path inputted and checks whether it is a file path or a folder path + """ + if not self.pdf_file: + raise ValueError(f"Invalid Path") + if os.path.isfile(self.pdf_file) and self.pdf_file.endswith(".pdf"): + return True + else: + raise ValueError( + f"Invalid Path {self.pdf_file}, please input the correct pdf file path." + ) + + def extract_info(self) -> Tuple[bool, dict]: + """ + Extracts file info + """ + logger.info(f"Extracting file info from {self.pdf_file}") + # Open the PDF + pdf_doc = fitz.open(self.pdf_file) + try: + pdf_encrypted = pdf_doc.isEncrypted + except: + pdf_encrypted = pdf_doc.is_encrypted + output = { + "File": self.pdf_file, + "Encrypted": ("True" if pdf_encrypted else "False"), + } + # If PDF is encrypted the file metadata cannot be extracted + if not pdf_encrypted: + for key, value in pdf_doc.metadata.items(): + output[key] = value + # To Display File Info + logger.info( + "## File Information ##################################################" + ) + logger.info("\n".join("{}:{}".format(i, j) for i, j in output.items())) + logger.info( + "######################################################################" + ) + pdf_doc.close() + return True, output + + def extract_text(self, output_file: str = None) -> Tuple[bool, str, dict]: + """ + Extracts text from PDF + """ + # Extract text + try: + logger.info(f"Extracting text from {self.pdf_file}") + text = "" + page_text_dict = {} + pdf_doc = fitz.open(self.pdf_file) + try: + pdf_encrypted = pdf_doc.isEncrypted + except: + pdf_encrypted = pdf_doc.is_encrypted + if pdf_encrypted: + pdf_doc.authenticate("") + for page in pdf_doc: + page_text = page.get_text() + text += page_text + "\n" + page_text_dict[page.number] = page_text + # To Display Extracted Text + # logger.info( + # "## Extracted Text ####################################################" + # ) + # logger.info(text) + # logger.info( + # "######################################################################" + # ) + # Save to file + if output_file: + with open(output_file, "w", encoding="utf-8") as file: + file.write(text.strip()) + pdf_doc.close() + return True, text, page_text_dict + except Exception as e: + logger.error(f"Error extracting text: {e}") + print_exc() + return False, str(e), {} + + def parse_blocks_page(self, page: fitz.Page): + blocks = page.get_text("blocks") + list_of_blocks = [] + for block in blocks: + x0, y0, x1, y1, lines_in_the_block, block_no, block_type = block + list_of_blocks.append( + { + "bbox": [x0, y0, x1, y1], + "lines_in_the_block": lines_in_the_block, + "block_no": block_no, + "block_type": block_type, + } + ) + return list_of_blocks + + def parse_all_blocks(self): + pdf_doc = fitz.open(self.pdf_file) + try: + pdf_encrypted = pdf_doc.isEncrypted + except: + pdf_encrypted = pdf_doc.is_encrypted + if pdf_encrypted: + pdf_doc.authenticate("") + pdf_blocks = {} + for page_num in tqdm(range(pdf_doc.page_count), disable=False): + page = pdf_doc[page_num] + blocks = self.parse_blocks_page(page) + pdf_blocks[page_num] = blocks + return pdf_blocks + + def search_for_text(self, page_text, search_str): + """ + Search for the search string within the document lines + """ + # Find all matches within one line + result_iter = re.finditer(search_str, page_text, re.IGNORECASE) + results = [ + result.group() for result in result_iter if result.group().strip() != "" + ] + # In case multiple matches within one line + return results + + def redact_matching_data(self, page, matched_value): + """ + Redacts matching values + """ + logger.info(f"Redacting matching values in {self.pdf_file}") + matching_val_area = page.search_for(matched_value) + # Redact matching values + [ + page.add_redact_annot(area, text=" ", fill=(0, 0, 0)) + for area in matching_val_area + ] + # Apply the redaction + page.apply_redactions() + return matching_val_area + + def frame_matching_data(self, page, matched_value): + """ + frames matching values + """ + matching_val_area = page.search_for(matched_value) + for area in matching_val_area: + if isinstance(area, fitz.fitz.Rect): + # Draw a rectangle around matched values + annot = page.add_redact_annot(area) + # , fill = fitz.utils.getColor('black') + annot.setColors(stroke=fitz.utils.getColor("red")) + # If you want to remove matched data + # page.addFreetextAnnot(area, ' ') + annot.update() + return matching_val_area + + def highlight_rectangle(self, + pdf_doc: fitz.Document, + page_index: int, + bbox: list, + title: str = "", + content: dict = {}): + """ + Highlight rectangle + """ + rectangle = fitz.Rect(bbox[0], bbox[1], bbox[2], bbox[3]) + page = pdf_doc[page_index] + highlight = page.add_highlight_annot([rectangle]) + content_text = json.dumps(content) + highlight.set_info(content=content_text, title=title) + highlight.update() + + def highlight_matching_data( + self, + page, + text_block, + within_bbox: list = None, + highlight_text_inside_block: str = None, + content: dict = {}, + title: str = "", + only_hightlight_first: bool = False, + exact_match: bool = False, + ): + """ + Highlight matching values + """ + # logger.info(f"Highlighting matching values in {self.pdf_file}") + if within_bbox is not None: + matching_val_area = page.search_for( + text_block, clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3]) + ) + if len(matching_val_area) == 0: + matching_val_area = page.search_for(text_block.replace('\n', '').replace('-', ''), + clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3])) + if len(matching_val_area) == 0: + matching_val_area = page.search_for(text_block.replace('-\n', ''), + clip=fitz.Rect(within_bbox[0], within_bbox[1], within_bbox[2], within_bbox[3])) + if len(matching_val_area) == 0: + matching_val_area = page.search_for(text_block) + else: + matching_val_area = page.search_for(text_block) + if len(matching_val_area) == 0: + matching_val_area = page.search_for(text_block.replace('\n', '').replace('-', '')) + if len(matching_val_area) == 0: + matching_val_area = page.search_for(text_block.replace('-\n', '')) + if ( + highlight_text_inside_block is not None + and len(highlight_text_inside_block) > 0 + ): + highlight_bbox_list = [] + for area in matching_val_area: + text_bbox_area = page.search_for( + highlight_text_inside_block, + clip=[area.x0, area.y0, area.x1, area.y1], + ) + if text_bbox_area is not None and len(text_bbox_area) > 0: + if only_hightlight_first: + highlight_bbox_list.append(text_bbox_area[0]) + break + else: + highlight_bbox_list.extend(text_bbox_area) + matching_val_area = highlight_bbox_list + else: + if only_hightlight_first: + matching_val_area = [matching_val_area[0]] + + if matching_val_area is not None and len(matching_val_area) > 0: + matching_val_area = self.merge_matching_val_area(matching_val_area) + if exact_match: + matching_val_area = self.get_exact_match_area(page, matching_val_area, text_block) + # matching_val_area = self.merge_matching_val_area(matching_val_area) + for area in matching_val_area: + highlight = page.add_highlight_annot([area]) + bbox_list = [area.x0, area.y0, area.x1, area.y1] + content["bbox"] = bbox_list + content_text = json.dumps(content) + highlight.set_info(content=content_text, title=title) + highlight.update() + return matching_val_area + + def get_exact_match_area(self, page, matching_val_area, search_text): + results = [] + for area in matching_val_area: + area_text = page.get_text("text", clip=area).strip() + area_text_list = area_text.split() + search_text_list = search_text.split() + capital_not_match = False + any_word_match = False + for search_split in search_text_list: + if search_split in area_text_list: + any_word_match = True + search_split_lower = search_split.lower() + if search_split_lower in area_text_list and \ + search_split not in area_text_list: + capital_not_match = True + break + if capital_not_match: + continue + elif any_word_match: + results.append(area) + else: + pass + return results + + def merge_matching_val_area(self, matching_val_area): + """ + Merge the matching val areas which with same y0 and y1, + the x0 is the min x0, x1 is the max x1 + """ + if matching_val_area is None or len(matching_val_area) == 0: + return matching_val_area + if len(matching_val_area) == 1: + return matching_val_area + # unify the y0 and y1 which are close to each other (less than 5 pixels) + y0_list = [] + y1_list = [] + for area in matching_val_area: + y0 = area.y0 + y1 = area.y1 + if len(y0_list) == 0: + y0_list.append(y0) + y1_list.append(y1) + else: + for t_y0 in y0_list: + if abs(t_y0 - y0) < 5: + area.y0 = t_y0 + else: + if y0 not in y0_list: + y0_list.append(y0) + for t_y1 in y1_list: + if abs(t_y1 - y1) < 5: + area.y1 = t_y1 + else: + if y1 not in y1_list: + y1_list.append(y1) + # get area list which with same y0 and y1 + y0_y1_list = list(set([(area.y0, area.y1) for area in matching_val_area])) + + new_matching_val_area = [] + for y0_y1 in y0_y1_list: + y0 = y0_y1[0] + y1 = y0_y1[1] + x0_list = [area.x0 for area in matching_val_area if area.y0 == y0 and area.y1 == y1] + x1_list = [area.x1 for area in matching_val_area if area.y0 == y0 and area.y1 == y1] + min_x0 = min(x0_list) + max_x1 = max(x1_list) + new_matching_val_area.append(fitz.Rect(min_x0, y0, max_x1, y1)) + return new_matching_val_area + + def highlight_matching_paragraph_text( + self, + pdf_doc: fitz.Document, + page_index: int, + search_paragraph_text: str, + sibling_paragraph_text_list: list = [], + next_page_found_lines: list = [], + content: dict = {}, + title: str = "", + ): + page = pdf_doc[page_index] + page_text = page.get_text("text") + page_lines = [ + line for line in page_text.split("\n") if len(line.strip()) > 0 + ] + matching_val_area = [] + find_begin = False + search_paragraph_text_words = [ + word.strip() + for word in search_paragraph_text.lower().split() + if len(word.strip()) > 0 + ] + found_words = [] + found_lines = [] + jacard_similarity = 0 + found_matched = False + found_lines_dict = {} + for index, line in enumerate(page_lines): + if len(next_page_found_lines) > 0: + if line in next_page_found_lines: + continue + words = [ + word.strip() for word in line.lower().split() if len(word.strip()) > 0 + ] + if len(words) == 0: + continue + if find_begin: + found_words.extend(words) + new_jacard_similarity = self.similarity.jaccard_similarity( + search_paragraph_text_words, found_words + ) + if new_jacard_similarity > jacard_similarity: + jacard_similarity = new_jacard_similarity + found_lines.append(line) + else: + if jacard_similarity > 0.4: + found_matched = True + break + else: + if search_paragraph_text_words[0].lower() in line.lower() and \ + search_paragraph_text_words[1].lower() in line.lower() and \ + search_paragraph_text_words[2].lower() in line.lower(): + jacard_similarity = self.similarity.jaccard_similarity( + search_paragraph_text_words, words + ) + if jacard_similarity > 0.05: + find_begin = True + found_words.extend(words) + found_lines.append(line) + if jacard_similarity > 0.4: + found_matched = True + + if found_matched and len(found_lines) > 0: + total_matching_val_area = [] + for line in found_lines: + matching_val_area = page.search_for(line) + if len(matching_val_area) == 0: + matching_val_area = page.search_for(line.strip()) + + if len(matching_val_area) == 0: + continue + elif len(matching_val_area) == 1: + total_matching_val_area.extend(matching_val_area) + else: + y1_list = [area.y1 for area in matching_val_area] + if len(total_matching_val_area) == 0: + y1_min = max(y1_list) + y1_min_index = y1_list.index(y1_min) + total_matching_val_area.append(matching_val_area[y1_min_index]) + else: + last_y1 = total_matching_val_area[-1].y1 + latest_bigger_y1_list = max( + [y1 for y1 in y1_list if y1 > last_y1] + ) + latest_bigger_y1_index = y1_list.index(latest_bigger_y1_list) + total_matching_val_area.append( + matching_val_area[latest_bigger_y1_index] + ) + # get min x0, min y0, max x1, max y1 from total_matching_val_area + x0_list = [area.x0 for area in total_matching_val_area] + y0_list = [area.y0 for area in total_matching_val_area] + x1_list = [area.x1 for area in total_matching_val_area] + y1_list = [area.y1 for area in total_matching_val_area] + min_x0 = min(x0_list) + min_y0 = min(y0_list) + max_x1 = max(x1_list) + max_y1 = max(y1_list) + matching_val_area = [fitz.Rect(min_x0, min_y0, max_x1, max_y1)] + highlight = page.add_highlight_annot(matching_val_area) + bbox_list = [[min_x0, min_y0, max_x1, max_y1]] + content["bbox"] = bbox_list + content_text = json.dumps(content) + highlight.set_info(content=content_text, title=title) + highlight.update() + + found_lines_dict = { + page_index: {"bbox_list": bbox_list, "found_lines": found_lines} + } + + # jacard_similarity is between 0.4 and 0.9, + # perhaps there are left lines in next page, so we need to check the next page. + if jacard_similarity > 0.4 and jacard_similarity < 0.9: + next_page_index = page_index + 1 + if next_page_index < pdf_doc.page_count: + next_page = pdf_doc[next_page_index] + next_page_text = next_page.get_text("text") + next_page_lines = [ + line + for line in next_page_text.split("\n") + if len(line.strip()) > 0 + ] + + found_line_index = -1 + for i in range(10): + if len(next_page_lines) < i + 1: + break + next_page_line = next_page_lines[i] + words = [ + word.strip() + for word in next_page_line.lower().split() + if len(word.strip()) > 0 + ] + if len(words) == 0: + continue + temp_found_words = found_words + words + new_jacard_similarity = self.similarity.jaccard_similarity( + search_paragraph_text_words, temp_found_words + ) + if new_jacard_similarity > jacard_similarity: + found_line_index = i + break + if found_line_index != -1: + new_found_words = found_words + new_found_lines = [] + found_matched = False + for index, line in enumerate(next_page_lines): + if index < found_line_index: + continue + words = [ + word.strip() + for word in line.lower().split() + if len(word.strip()) > 0 + ] + if len(words) == 0: + continue + new_found_words.extend(words) + new_jacard_similarity = ( + self.similarity.jaccard_similarity( + search_paragraph_text_words, new_found_words + ) + ) + if new_jacard_similarity > jacard_similarity: + jacard_similarity = new_jacard_similarity + new_found_lines.append(line) + else: + break + if len(new_found_lines) > 0: + total_matching_val_area = [] + for line in new_found_lines: + matching_val_area = next_page.search_for(line) + if len(matching_val_area) == 0: + matching_val_area = page.search_for(line.strip()) + + if len(matching_val_area) == 0: + continue + elif len(matching_val_area) == 1: + total_matching_val_area.extend(matching_val_area) + else: + y1_list = [area.y1 for area in matching_val_area] + if len(total_matching_val_area) == 0: + y1_min = max(y1_list) + y1_min_index = y1_list.index(y1_min) + total_matching_val_area.append( + matching_val_area[y1_min_index] + ) + else: + last_y1 = total_matching_val_area[-1].y1 + latest_bigger_y1_list = max( + [y1 for y1 in y1_list if y1 > last_y1] + ) + latest_bigger_y1_index = y1_list.index( + latest_bigger_y1_list + ) + total_matching_val_area.append( + matching_val_area[latest_bigger_y1_index] + ) + # get min x0, min y0, max x1, max y1 from total_matching_val_area + x0_list = [area.x0 for area in total_matching_val_area] + y0_list = [area.y0 for area in total_matching_val_area] + x1_list = [area.x1 for area in total_matching_val_area] + y1_list = [area.y1 for area in total_matching_val_area] + min_x0 = min(x0_list) + min_y0 = min(y0_list) + max_x1 = max(x1_list) + max_y1 = max(y1_list) + matching_val_area = [ + fitz.Rect(min_x0, min_y0, max_x1, max_y1) + ] + highlight = next_page.add_highlight_annot(matching_val_area) + new_bbox_list = [[min_x0, min_y0, max_x1, max_y1]] + content["found_page"] = next_page_index + content["bbox"] = new_bbox_list + content_text = json.dumps(content) + highlight.set_info(content=content_text, title=title) + highlight.update() + found_lines_dict[next_page_index] = { + "bbox_list": new_bbox_list, + "found_lines": new_found_lines, + } + + found_lines_dict_keys = list(found_lines_dict.keys()) + exact_match = True + exact_match_search_paragraph_text = search_paragraph_text + if len(found_lines_dict_keys) > 0 and len(sibling_paragraph_text_list) > 0: + found_line_list = [] + for key in found_lines_dict_keys: + found_line_list.extend(found_lines_dict[key]["found_lines"]) + found_line_words = [] + for line in found_line_list: + words = [ + word.strip() + for word in line.lower().split() + if len(word.strip()) > 0 + ] + found_line_words.extend(words) + + max_sibling_jacard_similarity = 0 + max_sibling_jacard_similarity_index = -1 + for index, sibling_paragraph_text in enumerate( + sibling_paragraph_text_list + ): + sibling_paragraph_text_words = [ + word.strip() + for word in sibling_paragraph_text.lower().split() + if len(word.strip()) > 0 + ] + sibling_jacard_similarity = self.similarity.jaccard_similarity( + sibling_paragraph_text_words, found_line_words + ) + if sibling_jacard_similarity > max_sibling_jacard_similarity: + max_sibling_jacard_similarity = sibling_jacard_similarity + max_sibling_jacard_similarity_index = index + if max_sibling_jacard_similarity > jacard_similarity: + exact_match = False + exact_match_search_paragraph_text = sibling_paragraph_text_list[ + max_sibling_jacard_similarity_index + ] + + return { + "found_lines_dict": found_lines_dict, + "exact_match": exact_match, + "exact_match_search_paragraph_text": exact_match_search_paragraph_text, + } + + def get_page_range_by_keywords( + self, + pdf_doc, + start_keywords, + end_keywords, + return_page_text_list=False, + ): + """ + Get page range by keywords + pdf_doc: pdf document + page_range_start_keywords: list of start keywords + page_range_end_keywords: list of end keywords + return_page_text_list: return page text list or not + """ + start_page = -1 + end_page = -1 + if len(start_keywords) == 0 or len(end_keywords) == 0: + start_page = 0 + if len(start_keywords) == 0 and len(end_keywords) == 0: + end_page = pdf_doc.page_count - 1 + search_start = 0 + # avoid to search the TOC part + if pdf_doc.page_count > 20: + search_start = 8 + for page_index in range(search_start, pdf_doc.page_count): + if start_page >= 0 and end_page >= 0: + break + page = pdf_doc[page_index] + page_text = page.get_text("text").strip() + page_text_list = [ + split.strip() + for split in page_text.split("\n") + if len(split.strip()) > 0 + ] + if start_page == -1: + find = self.find_keywords_in_text_list(page_text_list, start_keywords) + if find: + start_page = page_index + + if start_page >= 0 and end_page == -1: + find = self.find_keywords_in_text_list(page_text_list, end_keywords) + if find: + end_page = page_index + break + # return page_list which starts from start_page and ends at end_page + page_text_list = [] + if start_page >= 0 and end_page >= 0: + page_list = [i for i in range(start_page, end_page)] + if return_page_text_list: + for page_index in page_list: + page = pdf_doc[page_index] + page_text = page.get_text("text").strip() + page_text_list.append(page_text) + else: + page_list = [] + return page_list, page_text_list + + def exist_keywords_in_text_list(self, page_text, keywords): + page_text_list = [ + split.strip() for split in page_text.split("\n") if len(split.strip()) > 0 + ] + find = self.find_keywords_in_text_list(page_text_list, keywords) + return find + + def find_keywords_in_text_list(self, text_list, keywords): + """ + Find keywords in text list + """ + find = False + for keyword in keywords: + for index, line in enumerate(text_list): + if line.lower().startswith(keyword.lower()): + lower_case_begin_words_count = ( + self.get_lower_case_begin_words_count(line) + ) + if lower_case_begin_words_count > 3: + continue + + if line.upper() == line: + find = True + break + + if index != 0: + lower_case_begin_words_count = ( + self.get_lower_case_begin_words_count(text_list[index - 1]) + ) + if lower_case_begin_words_count > 3: + continue + + if index > 5: + if "." in line or "," in line: + continue + find = True + break + if find: + break + return find + + def get_lower_case_begin_words_count(self, text): + count = 0 + for word in text.split(): + if word[0].islower(): + count += 1 + return count + + def process_data( + self, + output_file: str, + dp_Value_info: dict, + pages: Tuple = None, + action: str = "Highlight", + ): + """ + Process the pages of the PDF File + 1. Open the input file. + 2. Create a memory buffer for storing temporarily the output file. + 3. Initialize a variable for storing the total number of matches of the string we were searching for. + 4. Iterate throughout the selected pages of the input file and split the current page into lines. + 5. Search for the string within the page. + 6. Apply the corresponding action (i.e "Redact", "Frame", "Highlight", etc.) + 7. Display a message signaling the status of the search process. + 8. Save and close the input file. + 9. Save the memory buffer to the output file. + + output_file: The path of the PDF file to generate after processing. + dp_Value_info: The information for data points. + pages: The pages to consider while processing the PDF file. + action: The action to perform on the PDF file. + """ + logger.info(f"Processing {self.pdf_file}") + data_list = [] + try: + # Save the generated PDF to memory buffer + pdf_doc = fitz.open(self.pdf_file) + try: + pdf_encrypted = pdf_doc.isEncrypted + except: + pdf_encrypted = pdf_doc.is_encrypted + if pdf_encrypted: + pdf_doc.authenticate("") + output_buffer = BytesIO() + find_value_dp_list = [] + matching_val_area_list = [] + + page_list = [i for i in range(pdf_doc.page_count)] + + dp_range_page_list = {} + + for dp_name, dp_detail in dp_Value_info.items(): + if not isinstance(dp_detail, dict): + continue + page_range_start_keywords = dp_detail.get( + "page_range_start_keywords", [] + ) + page_range_end_keywords = dp_detail.get("page_range_end_keywords", []) + if ( + len(page_range_start_keywords) > 0 + and len(page_range_end_keywords) > 0 + ): + page_list, page_text_list = self.get_page_range_by_keywords( + pdf_doc, + page_range_start_keywords, + page_range_end_keywords, + return_page_text_list=False, + ) + dp_range_page_list[dp_name] = page_list + + # Iterate through pages + next_page_found_lines = [] + for page_index in page_list: + # If required for specific pages + if pages: + if page_index not in pages: + continue + # Select the page + page = pdf_doc[page_index] + # Get Matching Data + # Split page by lines + page_text = page.get_text("text") + # if page_index in [24, 25]: + # print(page_text) + for dp_name, dp_detail in dp_Value_info.items(): + if not isinstance(dp_detail, dict): + continue + dp_biz_name = dp_detail.get("biz_name", "") + dp_level = dp_detail.get("level", "") + dp_value = dp_detail.get("value", "") + value_text_type = dp_detail.get("value_text_type", "string") + if value_text_type == "string": + dp_value_text = dp_detail.get("value_text", "") + elif value_text_type == "list": + dp_value_text = dp_detail.get("value_text", []) + else: + dp_value_text = dp_detail.get("value_text", "") + value_text_structure = dp_detail.get("value_text_structure", "word") + + inner_context_regex = dp_detail.get("inner_context_regex", "") + text_value_dict = dp_detail.get("text_value_dict", {}) + + search_str_list = dp_detail.get("regex_list", []) + only_hightlight_value_text = dp_detail.get( + "only_hightlight_value_text", False + ) + only_hightlight_first = dp_detail.get( + "only_hightlight_first", False + ) + # logger.info(f"Processing Data Point: {dp_name}") + + page_range_start_keywords = dp_detail.get( + "page_range_start_keywords", [] + ) + page_range_end_keywords = dp_detail.get( + "page_range_end_keywords", [] + ) + if ( + len(page_range_start_keywords) > 0 + and len(page_range_end_keywords) > 0 + ): + if not page_index in dp_range_page_list.get(dp_name, []): + continue + + # find_value = False + if value_text_structure == "paragraph": + found = dp_detail.get("found", False) + if found: + continue + if ( + dp_detail.get("matched_page", -1) != -1 + and len(dp_detail.get("bbox_list", [])) > 0 + ): + continue + sibling_paragraph_text_list = dp_detail.get( + "sibling_paragraph_text_list", [] + ) + content = { + "data_point": dp_biz_name, + "data_point_db_name": dp_name, + "data_point_level": dp_level, + "found_page": page_index, + "bbox": None, + } + + found_dict = self.highlight_matching_paragraph_text( + pdf_doc=pdf_doc, + page_index=page_index, + search_paragraph_text=dp_value_text, + sibling_paragraph_text_list=sibling_paragraph_text_list, + next_page_found_lines=next_page_found_lines, + content=content, + title=dp_biz_name, + ) + + found_lines_dict = found_dict.get("found_lines_dict", {}) + exact_match = found_dict.get("exact_match", True) + exact_match_search_paragraph_text = found_dict.get( + "exact_match_search_paragraph_text", dp_value_text + ) + found_lines_dict_keys = list(found_lines_dict.keys()) + if len(found_lines_dict_keys) > 0: + found_next_page_lines = False + if exact_match: + dp_detail["found"] = True + for found_page_index, found_lines_info in found_lines_dict.items(): + bbox_list = found_lines_info.get("bbox_list", []) + bbox_normalized_list = self.get_bbox_normalized( + page, bbox_list + ) + found_lines = found_lines_info.get("found_lines", []) + if found_page_index == page_index + 1: + next_page_found_lines = found_lines + found_next_page_lines = True + found_text = " ".join(found_lines).strip() + data = { + "pdf_file": self.simple_pdf_file, + "dp_name": dp_name, + "dp_biz_name": dp_biz_name, + "dp_level": dp_level, + "ground_truth": dp_value_text, + "ground_truth_text": dp_value_text, + "search_str": "", + "found_page": found_page_index, + "found_value": found_text, + "found_value_context": found_text, + "found_bbox": bbox_list, + "found_bbox_normalized": bbox_normalized_list, + "output_file": output_file, + "action": action, + "comment": "found page number is page number, as page number starts from 0.", + } + if dp_name not in find_value_dp_list: + find_value_dp_list.append(dp_name) + data_list.append(data) + else: + for dp_name, dp_detail in dp_Value_info.items(): + value_text_structure = dp_detail.get("value_text_structure", "word") + if value_text_structure == "paragraph": + dp_value_text = dp_detail.get("value_text", "") + if dp_value_text == exact_match_search_paragraph_text: + dp_detail["found"] = True + break + for found_page_index, found_lines_info in found_lines_dict.items(): + bbox_list = found_lines_info.get("bbox_list", []) + bbox_normalized_list = self.get_bbox_normalized( + page, bbox_list + ) + found_lines = found_lines_info.get("found_lines", []) + if found_page_index == page_index + 1: + next_page_found_lines = found_lines + found_next_page_lines = True + found_text = " ".join(found_lines).strip() + data = { + "pdf_file": self.simple_pdf_file, + "dp_name": dp_name, + "dp_biz_name": dp_biz_name, + "dp_level": dp_level, + "ground_truth": exact_match_search_paragraph_text, + "ground_truth_text": exact_match_search_paragraph_text, + "search_str": "", + "found_page": found_page_index, + "found_value": found_text, + "found_value_context": found_text, + "found_bbox": bbox_list, + "found_bbox_normalized": bbox_normalized_list, + "output_file": output_file, + "action": action, + "comment": "found page number is page number, as page number starts from 0.", + } + if dp_name not in find_value_dp_list: + find_value_dp_list.append(dp_name) + data_list.append(data) + if not found_next_page_lines: + next_page_found_lines = [] + else: + if search_str_list is not None and len(search_str_list) > 0: + matched_blocks = [] + for search_str in search_str_list: + found_blocks = self.search_for_text( + page_text, search_str + ) + if found_blocks: + matched_blocks.extend(found_blocks) + else: + # get matched blocks by similarity for dp_value_text + # the data point value is string, not totally same as the value in the database. + # such as, dp_name is FundName or ShareClassName or Advisor or Strategy + matched_blocks = [] + if matched_blocks: + for matched_block in matched_blocks: + dp_value = "" + if inner_context_regex != "": + dp_value_text_search = re.search(inner_context_regex, matched_block, re.IGNORECASE) + if dp_value_text_search is not None: + dp_value_text = dp_value_text_search.group(0).strip() + # remove special characters + dp_value_text = re.sub(r"\W+", " ", dp_value_text).strip() + if dp_value_text == "may": + continue + if dp_value_text != "" and len(text_value_dict.keys()) > 0: + dp_value = text_value_dict.get(dp_value_text.lower(), "") + else: + dp_value_text = matched_block + dp_value = matched_block + else: + if dp_value_text == "": + dp_value_text = matched_block + dp_value = matched_block + content = { + "data_point_name": dp_biz_name, + "data_point_db_name": dp_name, + "data_point_level": dp_level, + "page_index": page_index, + "dp_value_text": dp_value_text, + "dp_value": dp_value, + "bbox": None, + } + if only_hightlight_value_text and dp_value_text != "" and dp_value_text != matched_block: + matching_val_area = self.highlight_matching_data( + page=page, + text_block=matched_block, + highlight_text_inside_block=dp_value_text, + content=content, + title=dp_biz_name, + only_hightlight_first=only_hightlight_first, + ) + else: + matching_val_area = self.highlight_matching_data( + page=page, + text_block=matched_block, + highlight_text_inside_block=None, + content=content, + title=dp_biz_name, + only_hightlight_first=only_hightlight_first, + ) + if len(matching_val_area) > 0: + matching_val_area_list.extend(matching_val_area) + + if len(matching_val_area) > 0: + bbox_list = [] + for area in matching_val_area: + bbox_list.append( + [area.x0, area.y0, area.x1, area.y1] + ) + bbox_normalized_list = self.get_bbox_normalized( + page, bbox_list + ) + data = { + "pdf_file": self.simple_pdf_file, + "dp_name": dp_name, + "dp_biz_name": dp_biz_name, + "dp_level": dp_level, + "ground_truth": dp_value, + "ground_truth_text": dp_value_text, + "search_str": search_str, + "found_page": page_index, + "found_value_text": dp_value_text, + "found_value": dp_value, + "found_value_context": matched_block.strip(), + "found_bbox": bbox_list, + "found_bbox_normalized": bbox_normalized_list, + "output_file": output_file, + "action": action, + "comment": "found page number is page number, as page number starts from 0.", + } + if dp_name not in find_value_dp_list: + find_value_dp_list.append(dp_name) + data_list.append(data) + # find_value = True + if only_hightlight_first: + break + + if len(find_value_dp_list) == 0: + output_file = "" + for dp_name, dp_detail in dp_Value_info.items(): + if dp_name not in find_value_dp_list: + dp_biz_name = dp_detail.get("biz_name", "") + dp_level = dp_detail.get("level", "") + dp_value = dp_detail.get("value", "") + dp_value_text = dp_detail.get("value_text", "") + data = { + "pdf_file": self.simple_pdf_file, + "dp_name": dp_name, + "dp_biz_name": dp_biz_name, + "dp_level": dp_level, + "ground_truth": dp_value, + "ground_truth_text": dp_value_text, + "search_str": "", + "found_page": -1, + "found_value_text": "", + "found_value": -1, + "found_value_context": "", + "found_bbox": [], + "found_bbox_normalized": [], + "output_file": output_file, + "action": action, + "comment": f"Not found {dp_biz_name} in the document.", + } + data_list.append(data) + logger.info( + f"{len(matching_val_area_list)} Match(es) In Input File: {self.pdf_file}" + ) + # Save to output + pdf_doc.save(output_buffer) + pdf_doc.close() + if len(find_value_dp_list) > 0 and \ + output_file is not None and \ + output_file != "": + # Save the output buffer to the output file + with open(output_file, mode="wb") as f: + f.write(output_buffer.getbuffer()) + logger.info(f"File saved to {output_file}") + except Exception as e: + logger.error(f"Error processing file: {e}") + print_exc() + if len(data_list) == 0: + data = { + "pdf_file": self.simple_pdf_file, + "dp_name": "", + "dp_biz_name": "", + "dp_level": "", + "ground_truth": "", + "ground_truth_text": "", + "search_str": "", + "found_page": -1, + "found_value_text": "", + "found_value": -1, + "found_value_context": "", + "found_bbox": [], + "found_bbox_normalized": [], + "output_file": output_file, + "action": action, + "comment": "", + } + data_list.append(data) + return data_list + + def get_bbox_normalized(self, page, bbox): + page_width = page.rect.width + page_height = page.rect.height + bbox_normalized = [] + for box in bbox: + x0 = box[0] / page_width + y0 = box[1] / page_height + x1 = box[2] / page_width + y1 = box[3] / page_height + bbox_normalized.append([x0, y0, x1, y1]) + return bbox_normalized + + def find_value_by_regex(self, page_text, search_str: str): + pass + + def get_high_similarity_text( + self, page_text, search_str: str, threshold: float = 0.8 + ): + matched_values = [] + page_text_list = page_text.split("\n") + return matched_values + + def remove_highlght(self, output_file: str, pages: Tuple = None): + """ + 1. Open the input file. + 2. Create a memory buffer for storing temporarily the output file. + 3. Iterate throughout the pages of the input file and checks if annotations are found. + 4. Delete these annotations. + 5. Display a message signaling the status of this process. + 6. Close the input file. + 7. Save the memory buffer to the output file. + """ + logger.info(f"Removing Highlights from {self.pdf_file}") + try: + # Save the generated PDF to memory buffer + pdf_doc = fitz.open(self.pdf_file) + try: + pdf_encrypted = pdf_doc.isEncrypted + except: + pdf_encrypted = pdf_doc.is_encrypted + if pdf_encrypted: + pdf_doc.authenticate("") + output_buffer = BytesIO() + # Initialize a counter for annotations + annot_found = 0 + # Iterate through pages + for pg in range(pdf_doc.page_count): + # If required for specific pages + if pages: + if str(pg) not in pages: + continue + # Select the page + page = pdf_doc[pg] + annot = page.first_annot + while annot: + annot_found += 1 + page.delete_annot(annot) + annot = annot.next + if annot_found >= 0: + print(f"Annotation(s) Found In The Input File: {self.pdf_file}") + # Save to output + pdf_doc.save(output_buffer) + pdf_doc.close() + # Save the output buffer to the output file + with open(output_file, mode="wb") as f: + f.write(output_buffer.getbuffer()) + except Exception as e: + logger.error(f"Error removing highlights: {e}") + print_exc() + + def process_file( + self, + output_file: str, + dp_Value_info: dict = None, + pages: Tuple = None, + action: str = "Highlight", + ): + """ + To process one single file + Redact, Frame, Highlight... one PDF File + Remove Highlights from a single PDF File + action: Redact, Frame, Highlight, Squiggly, Underline, Strikeout, Remove + """ + logger.info(f"Processing {self.pdf_file}") + + if output_file is None: + output_file = self.pdf_file + + data_list = [] + # Redact, Frame, Highlight, Squiggly, Underline, Strikeout, Remove + if action == "Remove": + # Remove the Highlights except Redactions + self.remove_highlght(output_file=output_file, pages=pages) + else: + data_list = self.process_data( + output_file=output_file, + dp_Value_info=dp_Value_info, + pages=pages, + action=action, + ) + return data_list diff --git a/utils/s3_util.py b/utils/s3_util.py new file mode 100644 index 0000000..177d7c4 --- /dev/null +++ b/utils/s3_util.py @@ -0,0 +1,40 @@ +""" upload one directory from the current working directory to aws """ +from pathlib import Path +import os +import glob +import boto3 + +def upload_dir(local_dir, aws_init_dir, bucket_name, tag, prefix='/'): + """ + from current working directory, upload a 'localDir' with all its subcontents (files and subdirectories...) + to a aws bucket + Parameters + ---------- + local_dir : localDirectory to be uploaded, with respect to current working directory + aws_init_dir : prefix 'directory' in aws + bucket_name : bucket in aws + tag : tag to select files, like *png + NOTE: if you use tag it must be given like --tag '*txt', in some quotation marks... for argparse + prefix : to remove initial '/' from file names + + Returns + ------- + None + """ + s3 = boto3.resource('s3') + cwd = str(Path.cwd()) + p = Path(os.path.join(Path.cwd(), local_dir)) + mydirs = list(p.glob('**')) + for mydir in mydirs: + file_names = glob.glob(os.path.join(mydir, tag)) + file_names = [f for f in file_names if not Path(f).is_dir()] + rows = len(file_names) + for i, file_name in enumerate(file_names): + # file_name = str(file_name).replace(cwd, '') + s3_file_name = "" + if file_name.startswith(prefix): # only modify the text if it starts with the prefix + s3_file_name = file_name.replace(prefix, "", 1) # remove one instance of prefix + print(f"fileName {file_name}") + if len(s3_file_name) > 0: + s3_path = os.path.join(aws_init_dir, str(s3_file_name)) + s3.meta.client.upload_file(file_name, bucket_name, s3_path) \ No newline at end of file diff --git a/utils/similarity.py b/utils/similarity.py new file mode 100644 index 0000000..b5ddb44 --- /dev/null +++ b/utils/similarity.py @@ -0,0 +1,165 @@ +""" +@version: 0.1 +@author: Blade He +@license: Morningstar +@contact: blade.he@morningstar.com +@site: +@software: PyCharm +@file: Similarity.py +@time: 2019/03/20 +""" +from math import * +from decimal import Decimal +import math +import re + + +class Similarity: + """ Five similarity measures function """ + def euclidean_distance(self, x, y): + """ return euclidean distance between two lists """ + + return sqrt(sum(pow(a - b, 2) for a, b in zip(x, y))) + + def manhattan_distance(self, x, y): + """ return manhattan distance between two lists """ + + return sum(abs(a - b) for a, b in zip(x, y)) + + def minkowski_distance(self, x, y, p_value): + """ return minkowski distance between two lists """ + + return self.nth_root(sum(pow(abs(a - b), p_value) for a, b in zip(x, y)), p_value) + + def nth_root(self, value, n_root): + """ returns the n_root of an value """ + + root_value = 1 / float(n_root) + return round(Decimal(value) ** Decimal(root_value), 3) + + def cosine_similarity(self, x, y): + """ return cosine similarity between two lists """ + + numerator = sum(a * b for a, b in zip(x, y)) + denominator = self.square_rooted(x) * self.square_rooted(y) + return round(numerator / float(denominator), 3) + + def square_rooted(self, x): + """ return 3 rounded square rooted value """ + + return round(sqrt(sum([a * a for a in x])), 3) + + def jaccard_similarity(self, x: list, y: list): + """ returns the jaccard similarity between two lists """ + intersection_cardinality = len(set.intersection(*[set(x), set(y)])) + union_cardinality = len(set.union(*[set(x), set(y)])) + if union_cardinality == 0: + return 0 + return intersection_cardinality / float(union_cardinality) + + + def y_in_x_similarity(self, x: list, y: list): + """ returns the jaccard similarity between two lists """ + intersection_cardinality = len(set.intersection(*[set(x), set(y)])) + len_y = len(set(y)) + if len_y == 0: + return 0 + return intersection_cardinality / float(len_y) + + def compare_text_in_text_list_similarity(self, text: str, compare_text_list: list): + if text is None or len(text) == 0: + return 0 + if compare_text_list is None or len(compare_text_list) == 0: + return 0 + # remove specical case for text + text = text.lower() + + # Fix issue for matching fund feeder + # It's the case for the following text: + # Raw fund name: Schroders Capital UK Real Estate Fund Feeder Trust + # Fund name list in database: + # Schroder UK Real Estate Fund Feeder Trust + # Schroders Capital UK Real Estate Fund + # The matching should be Schroder UK Real Estate Fund Feeder Trust. + # But somehow, the matching is Schroders Capital UK Real Estate Fund, + # it's incorrect. + if "feeder" in text.split(): + need_tranform = False + for compare in compare_text_list: + if "feeder" in compare.lower().split(): + need_tranform = True + break + if need_tranform: + temp_max_similarity = 0 + temp_max_similarity_text = "" + for compare in compare_text_list: + compare = compare.lower() + if "feeder" in compare.split(): + similarity = self.y_in_x_similarity(text.split(), compare.split()) + if similarity > temp_max_similarity: + temp_max_similarity = similarity + temp_max_similarity_text = compare + if temp_max_similarity > 0: + text = temp_max_similarity_text + + text = re.sub(r'\W', ' ', text) + text = re.sub(r'\s+', ' ', text) + text_split = list(set([word for word in text.split() + if word.lower() not in ["name", "fund", "funds"]])) + if len(text_split) == 0: + return 0, "" + max_similarity = 0 + max_similarity_text = "" + max_similarity_text_split = [] + for comapare_text in compare_text_list: + updated_comapare_text = comapare_text.lower() + updated_comapare_text = re.sub(r'\W', ' ', updated_comapare_text) + updated_comapare_text = re.sub(r'\s+', ' ', updated_comapare_text) + comapare_text_split = list(set([word for word in updated_comapare_text.split() + if word.lower() not in ["name", "fund", "funds"]])) + if len(comapare_text_split) == 0: + continue + similarity = self.y_in_x_similarity(text_split, comapare_text_split) + if similarity > 0 and similarity == max_similarity: + if len(comapare_text_split) > len(max_similarity_text_split): + max_similarity_text = comapare_text + max_similarity_text_split = comapare_text_split + if similarity > max_similarity: + max_similarity = similarity + max_similarity_text = comapare_text + max_similarity_text_split = comapare_text_split + + return max_similarity, max_similarity_text + + + def edit_distance_similarity(self, left: str, right: str): + m, n = len(left) + 1, len(right) + 1 + # create a matrix (m*n) + matrix = [[0] * n for i in range(m)] + matrix[0][0] = 0 + for i in range(1, m): + matrix[i][0] = matrix[i - 1][0] + 1 + + for j in range(1, n): + matrix[0][j] = matrix[0][j - 1] + 1 + + # for i in range(m): + # print(matrix[i]) + # + # print() + "********************" + for i in range(1, m): + for j in range(1, n): + if left[i - 1] == right[j - 1]: + cost = 0 + else: + cost = 1 + + matrix[i][j] = min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1, matrix[i - 1][j - 1] + cost) + + # for i in range(m): + # print(matrix[i]) + + distance = matrix[m - 1][n - 1] + return 1 - distance / max(len(left), len(right)) + diff --git a/utils/sys_util.py b/utils/sys_util.py new file mode 100644 index 0000000..264afc9 --- /dev/null +++ b/utils/sys_util.py @@ -0,0 +1,16 @@ +import os +import boto3 +from ec2_metadata import ec2_metadata + + +def stop_instance(): + try: + ec2_path = r"/home/ec2-user" + if os.path.exists(ec2_path): + current_ec2_id = ec2_metadata.instance_id + region = ec2_metadata.region + ec2 = boto3.client('ec2', region_name=region) + ec2.stop_instances(instance_ids=[current_ec2_id]) + except Exception as e: + print(e) + os.system("sudo shutdown now -h") \ No newline at end of file