# from transformers import GPT2TokenizerFast import tiktoken from openai import AzureOpenAI import openai import os from time import sleep import base64 import dotenv # loads .env file with your OPENAI_API_KEY dotenv.load_dotenv() def get_embedding(text, engine=os.getenv("EMBEDDING_ENGINE")): count = 0 error = "" while count < 5: try: if count > 0: print(f"retrying the {count} time for getting text embedding...") return openai.Embedding.create(input=text, engine=engine)["data"][0][ "embedding" ] except Exception as e: error = str(e) print(error) count += 1 sleep(1) def num_tokens_from_string(string: str) -> int: tokenizer = tiktoken.get_encoding("cl100k_base") """Returns the number of tokens in a text string.""" num_tokens = len(tokenizer.encode(string)) return num_tokens def num_tokens_from_messages(messages, model="gpt-35-turbo-16k"): """Returns the number of tokens used by a list of messages.""" encoding = tiktoken.get_encoding("cl100k_base") if model == "gpt-35-turbo-16k": tokens_per_message = ( 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n ) tokens_per_name = -1 # if there's a name, the role is omitted elif model == "gpt-4-32k": tokens_per_message = 3 tokens_per_name = 1 else: tokens_per_message = 3 tokens_per_name = 1 num_tokens = 0 for message in messages: num_tokens += tokens_per_message for key, value in message.items(): num_tokens += len(encoding.encode(value)) if key == "name": num_tokens += tokens_per_name num_tokens += 3 # every reply is primed with <|start|>assistant<|message|> return num_tokens def chat( prompt: str, system_prompt: str = None, engine=os.getenv("Engine_GPT4o"), azure_endpoint=os.getenv("OPENAI_API_BASE_GPT4o"), api_key=os.getenv("OPENAI_API_KEY_GPT4o"), api_version=os.getenv("OPENAI_API_VERSION_GPT4o"), temperature: float = 0.0, max_tokens = 10240, response_format: dict = None, image_file: str = None, image_base64: str = None, ): if not engine.startswith("gpt-4o"): max_tokens = 4096 client = AzureOpenAI( azure_endpoint=azure_endpoint, api_key=api_key, api_version=api_version ) if ( image_base64 is None and image_file is not None and len(image_file) > 0 and os.path.exists(image_file) ): image_base64 = encode_image(image_file) if image_base64 is not None and len(image_base64) > 0: messages = [ { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_base64}"}, }, ], } ] else: messages = [{"role": "user", "content": prompt}] if system_prompt is not None and len(system_prompt) > 0: messages.insert(0, {"role": "system", "content": system_prompt}) count = 0 result = {} request_timeout = 600 while count < 8: response = None try: if count > 0: print(f"retrying the {count} time...") if response_format is None: response = client.chat.completions.create( model=engine, temperature=temperature, max_tokens=max_tokens, top_p=0.95, frequency_penalty=0, presence_penalty=0, timeout=request_timeout, stop=None, messages=messages, ) else: # response_format={"type": "json_object"} response = client.chat.completions.create( model=engine, temperature=temperature, max_tokens=max_tokens, top_p=0.95, frequency_penalty=0, presence_penalty=0, timeout=request_timeout, stop=None, messages=messages, response_format=response_format, ) sleep(1) result["full_response"] = response result["response"] = response.choices[0].message.content result["prompt_token"] = response.usage.prompt_tokens result["completion_token"] = response.usage.completion_tokens result["total_token"] = response.usage.total_tokens return result, False except Exception as e: error = str(e) print(f"error message: {error}") if "maximum context length" in error: result["full_response"] = response result["response"] = error result["prompt_token"] = response.usage.prompt_tokens result["completion_token"] = response.usage.completion_tokens result["total_token"] = response.usage.total_tokens return result, True count += 1 sleep(2) return result, True def encode_image(image_path: str): if image_path is None or len(image_path) == 0 or not os.path.exists(image_path): return None with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode("utf-8")