dc-ml-emea-ar/utils/gpt_utils.py

157 lines
4.8 KiB
Python
Raw Normal View History

2024-08-19 14:52:13 +00:00
# from transformers import GPT2TokenizerFast
import tiktoken
from openai import AzureOpenAI
import openai
import os
from time import sleep
2024-08-26 16:19:07 +00:00
import base64
2024-08-19 14:52:13 +00:00
import dotenv
2024-08-26 16:19:07 +00:00
2024-08-19 14:52:13 +00:00
# loads .env file with your OPENAI_API_KEY
dotenv.load_dotenv()
# tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")
tokenizer = tiktoken.get_encoding("cl100k_base")
2024-08-26 16:19:07 +00:00
2024-08-19 14:52:13 +00:00
def get_embedding(text, engine=os.getenv("EMBEDDING_ENGINE")):
count = 0
2024-08-26 16:19:07 +00:00
error = ""
2024-08-19 14:52:13 +00:00
while count < 5:
try:
if count > 0:
2024-08-26 16:19:07 +00:00
print(f"retrying the {count} time for getting text embedding...")
return openai.Embedding.create(input=text, engine=engine)["data"][0][
"embedding"
]
2024-08-19 14:52:13 +00:00
except Exception as e:
error = str(e)
print(error)
count += 1
sleep(1)
def num_tokens_from_string(string: str) -> int:
"""Returns the number of tokens in a text string."""
num_tokens = len(tokenizer.encode(string))
return num_tokens
def num_tokens_from_messages(messages, model="gpt-35-turbo-16k"):
"""Returns the number of tokens used by a list of messages."""
encoding = tiktoken.get_encoding("cl100k_base")
if model == "gpt-35-turbo-16k":
2024-08-26 16:19:07 +00:00
tokens_per_message = (
4 # every message follows <|start|>{role/name}\n{content}<|end|>\n
)
2024-08-19 14:52:13 +00:00
tokens_per_name = -1 # if there's a name, the role is omitted
elif model == "gpt-4-32k":
tokens_per_message = 3
tokens_per_name = 1
else:
tokens_per_message = 3
tokens_per_name = 1
num_tokens = 0
for message in messages:
num_tokens += tokens_per_message
for key, value in message.items():
num_tokens += len(encoding.encode(value))
if key == "name":
num_tokens += tokens_per_name
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|>
return num_tokens
2024-08-26 16:19:07 +00:00
def chat(
prompt: str,
engine=os.getenv("Engine_GPT4o"),
azure_endpoint=os.getenv("OPENAI_API_BASE_GPT4o"),
api_key=os.getenv("OPENAI_API_KEY_GPT4o"),
api_version=os.getenv("OPENAI_API_VERSION_GPT4o"),
temperature: float = 0.0,
max_tokens = 10240,
response_format: dict = None,
2024-08-26 16:19:07 +00:00
image_file: str = None,
image_base64: str = None,
):
if engine != "gpt-4o-2024-08-06-research":
max_tokens = 4096
2024-08-19 14:52:13 +00:00
client = AzureOpenAI(
2024-08-26 16:19:07 +00:00
azure_endpoint=azure_endpoint, api_key=api_key, api_version=api_version
2024-08-19 14:52:13 +00:00
)
2024-08-26 16:19:07 +00:00
if (
image_base64 is None
and image_file is not None
and len(image_file) > 0
and os.path.exists(image_file)
):
image_base64 = encode_image(image_file)
if image_base64 is not None and len(image_base64) > 0:
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{image_base64}"},
},
],
}
]
else:
messages = [{"role": "user", "content": prompt}]
2024-08-19 14:52:13 +00:00
count = 0
2024-08-26 16:19:07 +00:00
error = ""
2024-08-19 14:52:13 +00:00
request_timeout = 120
while count < 8:
try:
if count > 0:
2024-08-26 16:19:07 +00:00
print(f"retrying the {count} time...")
if response_format is None:
response = client.chat.completions.create(
model=engine,
temperature=temperature,
max_tokens=max_tokens,
top_p=0.95,
frequency_penalty=0,
presence_penalty=0,
timeout=request_timeout,
stop=None,
messages=messages,
)
else:
# response_format={"type": "json_object"}
response = client.chat.completions.create(
model=engine,
temperature=temperature,
max_tokens=max_tokens,
top_p=0.95,
frequency_penalty=0,
presence_penalty=0,
timeout=request_timeout,
stop=None,
messages=messages,
response_format=response_format,
)
2024-08-19 14:52:13 +00:00
return response.choices[0].message.content, False
except Exception as e:
error = str(e)
print(f"error message: {error}")
2024-08-26 16:19:07 +00:00
if "maximum context length" in error:
2024-08-19 14:52:13 +00:00
return error, True
count += 1
sleep(3)
2024-08-26 16:19:07 +00:00
return error, True
def encode_image(image_path: str):
if image_path is None or len(image_path) == 0 or not os.path.exists(image_path):
return None
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")