Compare commits

...

7 Commits

Author SHA1 Message Date
takatost
0c6ad1df64 optimize conversation / message insert 2024-10-25 00:31:15 -07:00
takatost
40fb522f56 add trace 2024-10-24 02:34:10 -07:00
takatost
96d9951d5c disable redis cache 2024-10-24 02:03:29 -07:00
takatost
d36201f7ff remove unused code 2024-10-24 01:16:32 -07:00
takatost
b46c7935b1 test disable socket patch 2024-10-24 01:12:04 -07:00
takatost
206e6e1e7c add debug print 2024-10-24 00:54:10 -07:00
takatost
8685c0d48b test replace gevent socket patch in redis 2024-10-24 00:11:45 -07:00
6 changed files with 98 additions and 77 deletions

View File

@ -5,6 +5,7 @@ on:
branches: branches:
- "main" - "main"
- "deploy/dev" - "deploy/dev"
- "fix/redis-slow-in-gevent"
release: release:
types: [published] types: [published]

View File

@ -34,6 +34,11 @@ class RedisConfig(BaseSettings):
default=0, default=0,
) )
REDIS_MAX_CONNECTIONS: PositiveInt = Field(
description="Maximum number of connections to Redis",
default=200,
)
REDIS_USE_SSL: bool = Field( REDIS_USE_SSL: bool = Field(
description="Enable SSL/TLS for the Redis connection", description="Enable SSL/TLS for the Redis connection",
default=False, default=False,

View File

@ -226,13 +226,11 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
is_first_conversation = True is_first_conversation = True
# init generate records # init generate records
(conversation, message) = self._init_generate_records(application_generate_entity, conversation) (conversation, message) = self._init_generate_records(
application_generate_entity=application_generate_entity,
if is_first_conversation: conversation=conversation,
# update conversation features override_model_configs=workflow.features_dict if is_first_conversation else None,
conversation.override_model_configs = workflow.features )
db.session.commit()
db.session.refresh(conversation)
# init queue manager # init queue manager
queue_manager = MessageBasedAppQueueManager( queue_manager = MessageBasedAppQueueManager(

View File

@ -1,4 +1,5 @@
import logging import logging
import time
from collections.abc import Mapping from collections.abc import Mapping
from typing import Any, cast from typing import Any, cast
@ -101,6 +102,9 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
): ):
return return
# trace start time
start_time = time.perf_counter()
# Init conversation variables # Init conversation variables
stmt = select(ConversationVariable).where( stmt = select(ConversationVariable).where(
ConversationVariable.app_id == self.conversation.app_id, ConversationVariable.app_id == self.conversation.app_id,
@ -128,6 +132,13 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
conversation_dialogue_count = self.conversation.dialogue_count conversation_dialogue_count = self.conversation.dialogue_count
db.session.commit() db.session.commit()
# trace end time
end_time = time.perf_counter()
print(f"conversation_dialogue_count time: {end_time - start_time}")
# trace start time
start_time = time.perf_counter()
# Create a variable pool. # Create a variable pool.
system_inputs = { system_inputs = {
SystemVariableKey.QUERY: query, SystemVariableKey.QUERY: query,
@ -151,6 +162,10 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
# init graph # init graph
graph = self._init_graph(graph_config=workflow.graph_dict) graph = self._init_graph(graph_config=workflow.graph_dict)
# trace end time
end_time = time.perf_counter()
print(f"init graph time: {end_time - start_time}")
db.session.close() db.session.close()
# RUN WORKFLOW # RUN WORKFLOW

View File

@ -15,7 +15,6 @@ from core.app.entities.queue_entities import (
QueuePingEvent, QueuePingEvent,
QueueStopEvent, QueueStopEvent,
) )
from extensions.ext_redis import redis_client
class PublishFrom(Enum): class PublishFrom(Enum):
@ -32,10 +31,10 @@ class AppQueueManager:
self._user_id = user_id self._user_id = user_id
self._invoke_from = invoke_from self._invoke_from = invoke_from
user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user" # user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
redis_client.setex( # redis_client.setex(
AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}" # AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}"
) # )
q = queue.Queue() q = queue.Queue()
@ -114,26 +113,27 @@ class AppQueueManager:
Set task stop flag Set task stop flag
:return: :return:
""" """
result = redis_client.get(cls._generate_task_belong_cache_key(task_id)) return
if result is None: # result = redis_client.get(cls._generate_task_belong_cache_key(task_id))
return # if result is None:
# return
user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user" # user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
if result.decode("utf-8") != f"{user_prefix}-{user_id}": # if result.decode("utf-8") != f"{user_prefix}-{user_id}":
return # return
stopped_cache_key = cls._generate_stopped_cache_key(task_id) # stopped_cache_key = cls._generate_stopped_cache_key(task_id)
redis_client.setex(stopped_cache_key, 600, 1) # redis_client.setex(stopped_cache_key, 600, 1)
def _is_stopped(self) -> bool: def _is_stopped(self) -> bool:
""" """
Check if task is stopped Check if task is stopped
:return: :return:
""" """
stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id) # stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id)
result = redis_client.get(stopped_cache_key) # result = redis_client.get(stopped_cache_key)
if result is not None: # if result is not None:
return True # return True
return False return False

View File

@ -1,8 +1,9 @@
import json import json
import logging import logging
from collections.abc import Generator import uuid
from collections.abc import Generator, Mapping
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional, Union from typing import Any, Optional, Union
from sqlalchemy import and_ from sqlalchemy import and_
@ -137,6 +138,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
AdvancedChatAppGenerateEntity, AdvancedChatAppGenerateEntity,
], ],
conversation: Optional[Conversation] = None, conversation: Optional[Conversation] = None,
override_model_configs: Optional[Mapping[str, Any]] = None,
) -> tuple[Conversation, Message]: ) -> tuple[Conversation, Message]:
""" """
Initialize generate records Initialize generate records
@ -158,14 +160,12 @@ class MessageBasedAppGenerator(BaseAppGenerator):
if isinstance(application_generate_entity, AdvancedChatAppGenerateEntity): if isinstance(application_generate_entity, AdvancedChatAppGenerateEntity):
app_model_config_id = None app_model_config_id = None
override_model_configs = None
model_provider = None model_provider = None
model_id = None model_id = None
else: else:
app_model_config_id = app_config.app_model_config_id app_model_config_id = app_config.app_model_config_id
model_provider = application_generate_entity.model_conf.provider model_provider = application_generate_entity.model_conf.provider
model_id = application_generate_entity.model_conf.model model_id = application_generate_entity.model_conf.model
override_model_configs = None
if app_config.app_model_config_from == EasyUIBasedAppModelConfigFrom.ARGS and app_config.app_mode in { if app_config.app_model_config_from == EasyUIBasedAppModelConfigFrom.ARGS and app_config.app_mode in {
AppMode.AGENT_CHAT, AppMode.AGENT_CHAT,
AppMode.CHAT, AppMode.CHAT,
@ -177,61 +177,63 @@ class MessageBasedAppGenerator(BaseAppGenerator):
introduction = self._get_conversation_introduction(application_generate_entity) introduction = self._get_conversation_introduction(application_generate_entity)
if not conversation: if not conversation:
conversation = Conversation( with db.Session(bind=db.engine, expire_on_commit=False) as session:
app_id=app_config.app_id, conversation = Conversation()
app_model_config_id=app_model_config_id, conversation.id = str(uuid.uuid4())
model_provider=model_provider, conversation.app_id = app_config.app_id
model_id=model_id, conversation.app_model_config_id = app_model_config_id
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None, conversation.model_provider = model_provider
mode=app_config.app_mode.value, conversation.model_id = model_id
name="New conversation", conversation.override_model_configs = (
inputs=application_generate_entity.inputs, json.dumps(override_model_configs) if override_model_configs else None
introduction=introduction, )
system_instruction="", conversation.mode = app_config.app_mode.value
system_instruction_tokens=0, conversation.name = "New conversation"
status="normal", conversation.inputs = application_generate_entity.inputs
invoke_from=application_generate_entity.invoke_from.value, conversation.introduction = introduction
from_source=from_source, conversation.system_instruction = ""
from_end_user_id=end_user_id, conversation.system_instruction_tokens = 0
from_account_id=account_id, conversation.status = "normal"
) conversation.invoke_from = application_generate_entity.invoke_from.value
conversation.from_source = from_source
conversation.from_end_user_id = end_user_id
conversation.from_account_id = account_id
db.session.add(conversation) session.add(conversation)
db.session.commit() session.commit()
db.session.refresh(conversation) session.refresh(conversation)
else: else:
conversation.updated_at = datetime.now(timezone.utc).replace(tzinfo=None) conversation.updated_at = datetime.now(timezone.utc).replace(tzinfo=None)
db.session.commit() db.session.commit()
message = Message( with db.Session(bind=db.engine, expire_on_commit=False) as session:
app_id=app_config.app_id, message = Message()
model_provider=model_provider, message.app_id = app_config.app_id
model_id=model_id, message.model_provider = model_provider
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None, message.model_id = model_id
conversation_id=conversation.id, message.override_model_configs = json.dumps(override_model_configs) if override_model_configs else None
inputs=application_generate_entity.inputs, message.conversation_id = conversation.id
query=application_generate_entity.query or "", message.inputs = application_generate_entity.inputs
message="", message.query = application_generate_entity.query or ""
message_tokens=0, message.message = ""
message_unit_price=0, message.message_tokens = 0
message_price_unit=0, message.message_unit_price = 0
answer="", message.answer = ""
answer_tokens=0, message.answer_tokens = 0
answer_unit_price=0, message.answer_unit_price = 0
answer_price_unit=0, message.answer_price_unit = 0
parent_message_id=getattr(application_generate_entity, "parent_message_id", None), message.parent_message_id = getattr(application_generate_entity, "parent_message_id", None)
provider_response_latency=0, message.provider_response_latency = 0
total_price=0, message.total_price = 0
currency="USD", message.currency = "USD"
invoke_from=application_generate_entity.invoke_from.value, message.invoke_from = application_generate_entity.invoke_from.value
from_source=from_source, message.from_source = from_source
from_end_user_id=end_user_id, message.from_end_user_id = end_user_id
from_account_id=account_id, message.from_account_id = account_id
)
db.session.add(message) session.add(message)
db.session.commit() session.commit()
db.session.refresh(message) session.refresh(message)
for file in application_generate_entity.files: for file in application_generate_entity.files:
message_file = MessageFile( message_file = MessageFile(