diff --git a/api/app.py b/api/app.py index 634f44d5e5..ed214bde97 100644 --- a/api/app.py +++ b/api/app.py @@ -3,14 +3,9 @@ import os from configs import dify_config if os.environ.get("DEBUG", "false").lower() != "true": - if os.environ.get("REDIS_GEVENT_SUPPORT", "false").lower() == "true": - from gevent import monkey + from gevent import monkey - monkey.patch_all(socket=False) - else: - from gevent import monkey - - monkey.patch_all() + monkey.patch_all() import grpc.experimental.gevent diff --git a/api/core/app/apps/base_app_queue_manager.py b/api/core/app/apps/base_app_queue_manager.py index 4c4d282e99..d16fd3c056 100644 --- a/api/core/app/apps/base_app_queue_manager.py +++ b/api/core/app/apps/base_app_queue_manager.py @@ -15,7 +15,6 @@ from core.app.entities.queue_entities import ( QueuePingEvent, QueueStopEvent, ) -from extensions.ext_redis import redis_client class PublishFrom(Enum): @@ -32,10 +31,10 @@ class AppQueueManager: self._user_id = user_id self._invoke_from = invoke_from - user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user" - redis_client.setex( - AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}" - ) + # user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user" + # redis_client.setex( + # AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}" + # ) q = queue.Queue() @@ -114,26 +113,27 @@ class AppQueueManager: Set task stop flag :return: """ - result = redis_client.get(cls._generate_task_belong_cache_key(task_id)) - if result is None: - return + return + # result = redis_client.get(cls._generate_task_belong_cache_key(task_id)) + # if result is None: + # return - user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user" - if result.decode("utf-8") != f"{user_prefix}-{user_id}": - return + # user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user" + # if result.decode("utf-8") != f"{user_prefix}-{user_id}": + # return - stopped_cache_key = cls._generate_stopped_cache_key(task_id) - redis_client.setex(stopped_cache_key, 600, 1) + # stopped_cache_key = cls._generate_stopped_cache_key(task_id) + # redis_client.setex(stopped_cache_key, 600, 1) def _is_stopped(self) -> bool: """ Check if task is stopped :return: """ - stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id) - result = redis_client.get(stopped_cache_key) - if result is not None: - return True + # stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id) + # result = redis_client.get(stopped_cache_key) + # if result is not None: + # return True return False diff --git a/api/extensions/ext_redis.py b/api/extensions/ext_redis.py index a9ab5304df..e1f8409f21 100644 --- a/api/extensions/ext_redis.py +++ b/api/extensions/ext_redis.py @@ -1,7 +1,5 @@ -import socket - import redis -from redis.connection import SSLConnection +from redis.connection import Connection, SSLConnection from redis.sentinel import Sentinel from configs import dify_config @@ -44,56 +42,9 @@ class RedisClientWrapper(redis.Redis): redis_client = RedisClientWrapper() -class GeventSafeConnection(redis.Connection): - socket_socket_class: type[socket.socket] | None = None - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def _connect(self): - "Create a TCP socket connection" - # we want to mimic what socket.create_connection does to support - # ipv4/ipv6, but we want to set options prior to calling - # socket.connect() - err = None - for res in socket.getaddrinfo(self.host, self.port, self.socket_type, socket.SOCK_STREAM): - family, socktype, proto, canonname, socket_address = res - sock = None - try: - socket_socket_class = self.socket_socket_class or socket.socket - sock = socket_socket_class(family, socktype, proto) - # TCP_NODELAY - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - - # TCP_KEEPALIVE - if self.socket_keepalive: - sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - for k, v in self.socket_keepalive_options.items(): - sock.setsockopt(socket.IPPROTO_TCP, k, v) - - # set the socket_connect_timeout before we connect - sock.settimeout(self.socket_connect_timeout) - - # connect - sock.connect(socket_address) - - # set the socket_timeout now that we're connected - sock.settimeout(self.socket_timeout) - return sock - - except OSError as _: - err = _ - if sock is not None: - sock.close() - - if err is not None: - raise err - raise OSError("socket.getaddrinfo returned an empty list") - - def init_app(app): global redis_client - connection_class = GeventSafeConnection + connection_class = Connection if dify_config.REDIS_USE_SSL: connection_class = SSLConnection