disable redis cache

This commit is contained in:
takatost 2024-10-24 02:03:29 -07:00
parent d36201f7ff
commit 96d9951d5c
3 changed files with 21 additions and 75 deletions

View File

@ -3,14 +3,9 @@ import os
from configs import dify_config from configs import dify_config
if os.environ.get("DEBUG", "false").lower() != "true": if os.environ.get("DEBUG", "false").lower() != "true":
if os.environ.get("REDIS_GEVENT_SUPPORT", "false").lower() == "true": from gevent import monkey
from gevent import monkey
monkey.patch_all(socket=False) monkey.patch_all()
else:
from gevent import monkey
monkey.patch_all()
import grpc.experimental.gevent import grpc.experimental.gevent

View File

@ -15,7 +15,6 @@ from core.app.entities.queue_entities import (
QueuePingEvent, QueuePingEvent,
QueueStopEvent, QueueStopEvent,
) )
from extensions.ext_redis import redis_client
class PublishFrom(Enum): class PublishFrom(Enum):
@ -32,10 +31,10 @@ class AppQueueManager:
self._user_id = user_id self._user_id = user_id
self._invoke_from = invoke_from self._invoke_from = invoke_from
user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user" # user_prefix = "account" if self._invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
redis_client.setex( # redis_client.setex(
AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}" # AppQueueManager._generate_task_belong_cache_key(self._task_id), 1800, f"{user_prefix}-{self._user_id}"
) # )
q = queue.Queue() q = queue.Queue()
@ -114,26 +113,27 @@ class AppQueueManager:
Set task stop flag Set task stop flag
:return: :return:
""" """
result = redis_client.get(cls._generate_task_belong_cache_key(task_id)) return
if result is None: # result = redis_client.get(cls._generate_task_belong_cache_key(task_id))
return # if result is None:
# return
user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user" # user_prefix = "account" if invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER} else "end-user"
if result.decode("utf-8") != f"{user_prefix}-{user_id}": # if result.decode("utf-8") != f"{user_prefix}-{user_id}":
return # return
stopped_cache_key = cls._generate_stopped_cache_key(task_id) # stopped_cache_key = cls._generate_stopped_cache_key(task_id)
redis_client.setex(stopped_cache_key, 600, 1) # redis_client.setex(stopped_cache_key, 600, 1)
def _is_stopped(self) -> bool: def _is_stopped(self) -> bool:
""" """
Check if task is stopped Check if task is stopped
:return: :return:
""" """
stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id) # stopped_cache_key = AppQueueManager._generate_stopped_cache_key(self._task_id)
result = redis_client.get(stopped_cache_key) # result = redis_client.get(stopped_cache_key)
if result is not None: # if result is not None:
return True # return True
return False return False

View File

@ -1,7 +1,5 @@
import socket
import redis import redis
from redis.connection import SSLConnection from redis.connection import Connection, SSLConnection
from redis.sentinel import Sentinel from redis.sentinel import Sentinel
from configs import dify_config from configs import dify_config
@ -44,56 +42,9 @@ class RedisClientWrapper(redis.Redis):
redis_client = RedisClientWrapper() redis_client = RedisClientWrapper()
class GeventSafeConnection(redis.Connection):
socket_socket_class: type[socket.socket] | None = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _connect(self):
"Create a TCP socket connection"
# we want to mimic what socket.create_connection does to support
# ipv4/ipv6, but we want to set options prior to calling
# socket.connect()
err = None
for res in socket.getaddrinfo(self.host, self.port, self.socket_type, socket.SOCK_STREAM):
family, socktype, proto, canonname, socket_address = res
sock = None
try:
socket_socket_class = self.socket_socket_class or socket.socket
sock = socket_socket_class(family, socktype, proto)
# TCP_NODELAY
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# TCP_KEEPALIVE
if self.socket_keepalive:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
for k, v in self.socket_keepalive_options.items():
sock.setsockopt(socket.IPPROTO_TCP, k, v)
# set the socket_connect_timeout before we connect
sock.settimeout(self.socket_connect_timeout)
# connect
sock.connect(socket_address)
# set the socket_timeout now that we're connected
sock.settimeout(self.socket_timeout)
return sock
except OSError as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
raise OSError("socket.getaddrinfo returned an empty list")
def init_app(app): def init_app(app):
global redis_client global redis_client
connection_class = GeventSafeConnection connection_class = Connection
if dify_config.REDIS_USE_SSL: if dify_config.REDIS_USE_SSL:
connection_class = SSLConnection connection_class = SSLConnection