Compare commits
7 Commits
main
...
fix/redis-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0c6ad1df64 | ||
|
|
40fb522f56 | ||
|
|
96d9951d5c | ||
|
|
d36201f7ff | ||
|
|
b46c7935b1 | ||
|
|
206e6e1e7c | ||
|
|
8685c0d48b |
1
.github/workflows/build-push.yml
vendored
1
.github/workflows/build-push.yml
vendored
@ -5,6 +5,7 @@ on:
|
|||||||
branches:
|
branches:
|
||||||
- "main"
|
- "main"
|
||||||
- "deploy/dev"
|
- "deploy/dev"
|
||||||
|
- "fix/redis-slow-in-gevent"
|
||||||
release:
|
release:
|
||||||
types: [published]
|
types: [published]
|
||||||
|
|
||||||
|
|||||||
5
api/configs/middleware/cache/redis_config.py
vendored
5
api/configs/middleware/cache/redis_config.py
vendored
@ -34,6 +34,11 @@ class RedisConfig(BaseSettings):
|
|||||||
default=0,
|
default=0,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
REDIS_MAX_CONNECTIONS: PositiveInt = Field(
|
||||||
|
description="Maximum number of connections to Redis",
|
||||||
|
default=200,
|
||||||
|
)
|
||||||
|
|
||||||
REDIS_USE_SSL: bool = Field(
|
REDIS_USE_SSL: bool = Field(
|
||||||
description="Enable SSL/TLS for the Redis connection",
|
description="Enable SSL/TLS for the Redis connection",
|
||||||
default=False,
|
default=False,
|
||||||
|
|||||||
@ -226,13 +226,11 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
|
|||||||
is_first_conversation = True
|
is_first_conversation = True
|
||||||
|
|
||||||
# init generate records
|
# init generate records
|
||||||
(conversation, message) = self._init_generate_records(application_generate_entity, conversation)
|
(conversation, message) = self._init_generate_records(
|
||||||
|
application_generate_entity=application_generate_entity,
|
||||||
if is_first_conversation:
|
conversation=conversation,
|
||||||
# update conversation features
|
override_model_configs=workflow.features_dict if is_first_conversation else None,
|
||||||
conversation.override_model_configs = workflow.features
|
)
|
||||||
db.session.commit()
|
|
||||||
db.session.refresh(conversation)
|
|
||||||
|
|
||||||
# init queue manager
|
# init queue manager
|
||||||
queue_manager = MessageBasedAppQueueManager(
|
queue_manager = MessageBasedAppQueueManager(
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
from collections.abc import Mapping
|
from collections.abc import Mapping
|
||||||
from typing import Any, cast
|
from typing import Any, cast
|
||||||
|
|
||||||
@ -101,6 +102,9 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
|||||||
):
|
):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# trace start time
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
|
||||||
# Init conversation variables
|
# Init conversation variables
|
||||||
stmt = select(ConversationVariable).where(
|
stmt = select(ConversationVariable).where(
|
||||||
ConversationVariable.app_id == self.conversation.app_id,
|
ConversationVariable.app_id == self.conversation.app_id,
|
||||||
@ -128,6 +132,13 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
|||||||
conversation_dialogue_count = self.conversation.dialogue_count
|
conversation_dialogue_count = self.conversation.dialogue_count
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
# trace end time
|
||||||
|
end_time = time.perf_counter()
|
||||||
|
print(f"conversation_dialogue_count time: {end_time - start_time}")
|
||||||
|
|
||||||
|
# trace start time
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
|
||||||
# Create a variable pool.
|
# Create a variable pool.
|
||||||
system_inputs = {
|
system_inputs = {
|
||||||
SystemVariableKey.QUERY: query,
|
SystemVariableKey.QUERY: query,
|
||||||
@ -151,6 +162,10 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
|
|||||||
# init graph
|
# init graph
|
||||||
graph = self._init_graph(graph_config=workflow.graph_dict)
|
graph = self._init_graph(graph_config=workflow.graph_dict)
|
||||||
|
|
||||||
|
# trace end time
|
||||||
|
end_time = time.perf_counter()
|
||||||
|
print(f"init graph time: {end_time - start_time}")
|
||||||
|
|
||||||
db.session.close()
|
db.session.close()
|
||||||
|
|
||||||
# RUN WORKFLOW
|
# RUN WORKFLOW
|
||||||
|
|||||||
@ -15,7 +15,6 @@ from core.app.entities.queue_entities import (
|
|||||||
QueuePingEvent,
|
QueuePingEvent,
|
||||||
QueueStopEvent,
|
QueueStopEvent,
|
||||||
)
|
)
|
||||||
from extensions.ext_redis import redis_client
|
|
||||||
|
|
||||||
|
|
||||||
class PublishFrom(Enum):
|
class PublishFrom(Enum):
|
||||||
@ -32,10 +31,10 @@ class AppQueueManager:
|
|||||||
self._user_id = user_id
|
self._user_id = user_id
|
||||||
self._invoke_from = invoke_from
|
self._invoke_from = invoke_from
|
||||||
|
|
||||||
user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
|
# user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
|
||||||
redis_client.setex(
|
# redis_client.setex(
|
||||||
AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}"
|
# AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}"
|
||||||
)
|
# )
|
||||||
|
|
||||||
q = queue.Queue()
|
q = queue.Queue()
|
||||||
|
|
||||||
@ -114,26 +113,27 @@ class AppQueueManager:
|
|||||||
Set task stop flag
|
Set task stop flag
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
result = redis_client.get(cls._generate_task_belong_cache_key(task_id))
|
return
|
||||||
if result is None:
|
# result = redis_client.get(cls._generate_task_belong_cache_key(task_id))
|
||||||
return
|
# if result is None:
|
||||||
|
# return
|
||||||
|
|
||||||
user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
|
# user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
|
||||||
if result.decode("utf-8") != f"{user_prefix}-{user_id}":
|
# if result.decode("utf-8") != f"{user_prefix}-{user_id}":
|
||||||
return
|
# return
|
||||||
|
|
||||||
stopped_cache_key = cls._generate_stopped_cache_key(task_id)
|
# stopped_cache_key = cls._generate_stopped_cache_key(task_id)
|
||||||
redis_client.setex(stopped_cache_key, 600, 1)
|
# redis_client.setex(stopped_cache_key, 600, 1)
|
||||||
|
|
||||||
def _is_stopped(self) -> bool:
|
def _is_stopped(self) -> bool:
|
||||||
"""
|
"""
|
||||||
Check if task is stopped
|
Check if task is stopped
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id)
|
# stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id)
|
||||||
result = redis_client.get(stopped_cache_key)
|
# result = redis_client.get(stopped_cache_key)
|
||||||
if result is not None:
|
# if result is not None:
|
||||||
return True
|
# return True
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|||||||
@ -1,8 +1,9 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
from collections.abc import Generator
|
import uuid
|
||||||
|
from collections.abc import Generator, Mapping
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from typing import Optional, Union
|
from typing import Any, Optional, Union
|
||||||
|
|
||||||
from sqlalchemy import and_
|
from sqlalchemy import and_
|
||||||
|
|
||||||
@ -137,6 +138,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
|||||||
AdvancedChatAppGenerateEntity,
|
AdvancedChatAppGenerateEntity,
|
||||||
],
|
],
|
||||||
conversation: Optional[Conversation] = None,
|
conversation: Optional[Conversation] = None,
|
||||||
|
override_model_configs: Optional[Mapping[str, Any]] = None,
|
||||||
) -> tuple[Conversation, Message]:
|
) -> tuple[Conversation, Message]:
|
||||||
"""
|
"""
|
||||||
Initialize generate records
|
Initialize generate records
|
||||||
@ -158,14 +160,12 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
|||||||
|
|
||||||
if isinstance(application_generate_entity, AdvancedChatAppGenerateEntity):
|
if isinstance(application_generate_entity, AdvancedChatAppGenerateEntity):
|
||||||
app_model_config_id = None
|
app_model_config_id = None
|
||||||
override_model_configs = None
|
|
||||||
model_provider = None
|
model_provider = None
|
||||||
model_id = None
|
model_id = None
|
||||||
else:
|
else:
|
||||||
app_model_config_id = app_config.app_model_config_id
|
app_model_config_id = app_config.app_model_config_id
|
||||||
model_provider = application_generate_entity.model_conf.provider
|
model_provider = application_generate_entity.model_conf.provider
|
||||||
model_id = application_generate_entity.model_conf.model
|
model_id = application_generate_entity.model_conf.model
|
||||||
override_model_configs = None
|
|
||||||
if app_config.app_model_config_from == EasyUIBasedAppModelConfigFrom.ARGS and app_config.app_mode in {
|
if app_config.app_model_config_from == EasyUIBasedAppModelConfigFrom.ARGS and app_config.app_mode in {
|
||||||
AppMode.AGENT_CHAT,
|
AppMode.AGENT_CHAT,
|
||||||
AppMode.CHAT,
|
AppMode.CHAT,
|
||||||
@ -177,61 +177,63 @@ class MessageBasedAppGenerator(BaseAppGenerator):
|
|||||||
introduction = self._get_conversation_introduction(application_generate_entity)
|
introduction = self._get_conversation_introduction(application_generate_entity)
|
||||||
|
|
||||||
if not conversation:
|
if not conversation:
|
||||||
conversation = Conversation(
|
with db.Session(bind=db.engine, expire_on_commit=False) as session:
|
||||||
app_id=app_config.app_id,
|
conversation = Conversation()
|
||||||
app_model_config_id=app_model_config_id,
|
conversation.id = str(uuid.uuid4())
|
||||||
model_provider=model_provider,
|
conversation.app_id = app_config.app_id
|
||||||
model_id=model_id,
|
conversation.app_model_config_id = app_model_config_id
|
||||||
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
|
conversation.model_provider = model_provider
|
||||||
mode=app_config.app_mode.value,
|
conversation.model_id = model_id
|
||||||
name="New conversation",
|
conversation.override_model_configs = (
|
||||||
inputs=application_generate_entity.inputs,
|
json.dumps(override_model_configs) if override_model_configs else None
|
||||||
introduction=introduction,
|
)
|
||||||
system_instruction="",
|
conversation.mode = app_config.app_mode.value
|
||||||
system_instruction_tokens=0,
|
conversation.name = "New conversation"
|
||||||
status="normal",
|
conversation.inputs = application_generate_entity.inputs
|
||||||
invoke_from=application_generate_entity.invoke_from.value,
|
conversation.introduction = introduction
|
||||||
from_source=from_source,
|
conversation.system_instruction = ""
|
||||||
from_end_user_id=end_user_id,
|
conversation.system_instruction_tokens = 0
|
||||||
from_account_id=account_id,
|
conversation.status = "normal"
|
||||||
)
|
conversation.invoke_from = application_generate_entity.invoke_from.value
|
||||||
|
conversation.from_source = from_source
|
||||||
|
conversation.from_end_user_id = end_user_id
|
||||||
|
conversation.from_account_id = account_id
|
||||||
|
|
||||||
db.session.add(conversation)
|
session.add(conversation)
|
||||||
db.session.commit()
|
session.commit()
|
||||||
db.session.refresh(conversation)
|
session.refresh(conversation)
|
||||||
else:
|
else:
|
||||||
conversation.updated_at = datetime.now(timezone.utc).replace(tzinfo=None)
|
conversation.updated_at = datetime.now(timezone.utc).replace(tzinfo=None)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
message = Message(
|
with db.Session(bind=db.engine, expire_on_commit=False) as session:
|
||||||
app_id=app_config.app_id,
|
message = Message()
|
||||||
model_provider=model_provider,
|
message.app_id = app_config.app_id
|
||||||
model_id=model_id,
|
message.model_provider = model_provider
|
||||||
override_model_configs=json.dumps(override_model_configs) if override_model_configs else None,
|
message.model_id = model_id
|
||||||
conversation_id=conversation.id,
|
message.override_model_configs = json.dumps(override_model_configs) if override_model_configs else None
|
||||||
inputs=application_generate_entity.inputs,
|
message.conversation_id = conversation.id
|
||||||
query=application_generate_entity.query or "",
|
message.inputs = application_generate_entity.inputs
|
||||||
message="",
|
message.query = application_generate_entity.query or ""
|
||||||
message_tokens=0,
|
message.message = ""
|
||||||
message_unit_price=0,
|
message.message_tokens = 0
|
||||||
message_price_unit=0,
|
message.message_unit_price = 0
|
||||||
answer="",
|
message.answer = ""
|
||||||
answer_tokens=0,
|
message.answer_tokens = 0
|
||||||
answer_unit_price=0,
|
message.answer_unit_price = 0
|
||||||
answer_price_unit=0,
|
message.answer_price_unit = 0
|
||||||
parent_message_id=getattr(application_generate_entity, "parent_message_id", None),
|
message.parent_message_id = getattr(application_generate_entity, "parent_message_id", None)
|
||||||
provider_response_latency=0,
|
message.provider_response_latency = 0
|
||||||
total_price=0,
|
message.total_price = 0
|
||||||
currency="USD",
|
message.currency = "USD"
|
||||||
invoke_from=application_generate_entity.invoke_from.value,
|
message.invoke_from = application_generate_entity.invoke_from.value
|
||||||
from_source=from_source,
|
message.from_source = from_source
|
||||||
from_end_user_id=end_user_id,
|
message.from_end_user_id = end_user_id
|
||||||
from_account_id=account_id,
|
message.from_account_id = account_id
|
||||||
)
|
|
||||||
|
|
||||||
db.session.add(message)
|
session.add(message)
|
||||||
db.session.commit()
|
session.commit()
|
||||||
db.session.refresh(message)
|
session.refresh(message)
|
||||||
|
|
||||||
for file in application_generate_entity.files:
|
for file in application_generate_entity.files:
|
||||||
message_file = MessageFile(
|
message_file = MessageFile(
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user