From 6460282f4851e3689c83ac72dbc97e24704c3f37 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Tue, 4 Mar 2025 15:34:58 +0800 Subject: [PATCH] feat: implement SQLAlchemy 2.0 style pagination for workflow app logs with time-based filtering Signed-off-by: -LAN- --- .../console/app/workflow_app_log.py | 11 ++-- api/controllers/service_api/app/workflow.py | 12 ++-- api/fields/workflow_app_log_fields.py | 2 +- api/services/workflow_app_service.py | 57 ++++++++++++------- 4 files changed, 54 insertions(+), 28 deletions(-) diff --git a/api/controllers/console/app/workflow_app_log.py b/api/controllers/console/app/workflow_app_log.py index 3f121411c9..e35fdc2c63 100644 --- a/api/controllers/console/app/workflow_app_log.py +++ b/api/controllers/console/app/workflow_app_log.py @@ -1,9 +1,11 @@ from flask_restful import Resource, marshal_with, reqparse # type: ignore from flask_restful.inputs import int_range # type: ignore +from sqlalchemy.orm import Session from controllers.console import api from controllers.console.app.wraps import get_app_model from controllers.console.wraps import account_initialization_required, setup_required +from extensions.ext_database import db from fields.workflow_app_log_fields import workflow_app_log_pagination_fields from libs.login import login_required from models import App @@ -36,11 +38,12 @@ class WorkflowAppLogApi(Resource): # get paginate workflow app logs workflow_app_service = WorkflowAppService() - workflow_app_log_pagination = workflow_app_service.get_paginate_workflow_app_logs( - app_model=app_model, args=args - ) + with Session(db.engine) as session: + workflow_app_log_pagination = workflow_app_service.get_paginate_workflow_app_logs( + session=session, app_model=app_model, args=args + ) - return workflow_app_log_pagination + return workflow_app_log_pagination api.add_resource(WorkflowAppLogApi, "/apps//workflow-app-logs") diff --git a/api/controllers/service_api/app/workflow.py b/api/controllers/service_api/app/workflow.py index df637b025f..cdbea220ea 100644 --- a/api/controllers/service_api/app/workflow.py +++ b/api/controllers/service_api/app/workflow.py @@ -2,6 +2,7 @@ import logging from flask_restful import Resource, fields, marshal_with, reqparse # type: ignore from flask_restful.inputs import int_range # type: ignore +from sqlalchemy.orm import Session from werkzeug.exceptions import InternalServerError from controllers.service_api import api @@ -125,17 +126,20 @@ class WorkflowAppLogApi(Resource): parser = reqparse.RequestParser() parser.add_argument("keyword", type=str, location="args") parser.add_argument("status", type=str, choices=["succeeded", "failed", "stopped"], location="args") + parser.add_argument("created_at__before", type=str, location="args") + parser.add_argument("created_at__after", type=str, location="args") parser.add_argument("page", type=int_range(1, 99999), default=1, location="args") parser.add_argument("limit", type=int_range(1, 100), default=20, location="args") args = parser.parse_args() # get paginate workflow app logs workflow_app_service = WorkflowAppService() - workflow_app_log_pagination = workflow_app_service.get_paginate_workflow_app_logs( - app_model=app_model, args=args - ) + with Session(db.engine) as session: + workflow_app_log_pagination = workflow_app_service.get_paginate_workflow_app_logs( + session=session, app_model=app_model, args=args + ) - return workflow_app_log_pagination + return workflow_app_log_pagination api.add_resource(WorkflowRunApi, "/workflows/run") diff --git a/api/fields/workflow_app_log_fields.py b/api/fields/workflow_app_log_fields.py index c45b33597b..0c032ca435 100644 --- a/api/fields/workflow_app_log_fields.py +++ b/api/fields/workflow_app_log_fields.py @@ -20,5 +20,5 @@ workflow_app_log_pagination_fields = { "limit": fields.Integer(attribute="per_page"), "total": fields.Integer, "has_more": fields.Boolean(attribute="has_next"), - "data": fields.List(fields.Nested(workflow_app_log_partial_fields), attribute="items"), + "data": fields.List(fields.Nested(workflow_app_log_partial_fields)), } diff --git a/api/services/workflow_app_service.py b/api/services/workflow_app_service.py index ec0290a3ef..c992109000 100644 --- a/api/services/workflow_app_service.py +++ b/api/services/workflow_app_service.py @@ -1,31 +1,33 @@ import uuid from datetime import datetime -from flask_sqlalchemy.pagination import Pagination -from sqlalchemy import and_, or_ +from sqlalchemy import and_, func, or_, select +from sqlalchemy.orm import Session -from extensions.ext_database import db from models import App, EndUser, WorkflowAppLog, WorkflowRun from models.enums import CreatedByRole from models.workflow import WorkflowRunStatus class WorkflowAppService: - def get_paginate_workflow_app_logs(self, app_model: App, args: dict) -> Pagination: + def get_paginate_workflow_app_logs(self, *, session: Session, app_model: App, args: dict) -> dict: """ - Get paginate workflow app logs - :param app: app model + Get paginate workflow app logs using SQLAlchemy 2.0 style + :param app_model: app model :param args: request args - :return: + :param session: SQLAlchemy session, if None will use db + :return: Pagination object """ - query = db.select(WorkflowAppLog).where( + # Build base statement using SQLAlchemy 2.0 style + stmt = select(WorkflowAppLog).where( WorkflowAppLog.tenant_id == app_model.tenant_id, WorkflowAppLog.app_id == app_model.id ) status = WorkflowRunStatus.value_of(args.get("status", "")) if args.get("status") else None - keyword = args["keyword"] + keyword = args.get("keyword") + if keyword or status: - query = query.join(WorkflowRun, WorkflowRun.id == WorkflowAppLog.workflow_run_id) + stmt = stmt.join(WorkflowRun, WorkflowRun.id == WorkflowAppLog.workflow_run_id) if keyword: keyword_like_val = f"%{keyword[:30].encode('unicode_escape').decode('utf-8')}%".replace(r"\u", r"\\u") @@ -41,21 +43,21 @@ class WorkflowAppService: if keyword_uuid: keyword_conditions.append(WorkflowRun.id == keyword_uuid) - query = query.outerjoin( + stmt = stmt.outerjoin( EndUser, and_(WorkflowRun.created_by == EndUser.id, WorkflowRun.created_by_role == CreatedByRole.END_USER), - ).filter(or_(*keyword_conditions)) + ).where(or_(*keyword_conditions)) if status: - # join with workflow_run and filter by status - query = query.filter(WorkflowRun.status == status.value) + # filter by status + stmt = stmt.where(WorkflowRun.status == status.value) # Add time-based filtering created_at_before = args.get("created_at__before") if created_at_before: try: before_date = datetime.fromisoformat(created_at_before.replace("Z", "+00:00")) - query = query.filter(WorkflowAppLog.created_at <= before_date) + stmt = stmt.where(WorkflowAppLog.created_at <= before_date) except ValueError: pass # Ignore invalid date format @@ -63,15 +65,32 @@ class WorkflowAppService: if created_at_after: try: after_date = datetime.fromisoformat(created_at_after.replace("Z", "+00:00")) - query = query.filter(WorkflowAppLog.created_at >= after_date) + stmt = stmt.where(WorkflowAppLog.created_at >= after_date) except ValueError: pass # Ignore invalid date format - query = query.order_by(WorkflowAppLog.created_at.desc()) + stmt = stmt.order_by(WorkflowAppLog.created_at.desc()) - pagination = db.paginate(query, page=args["page"], per_page=args["limit"], error_out=False) + page = args["page"] + per_page = args["limit"] - return pagination + # Get total count using the same filters + count_stmt = select(func.count()).select_from(stmt.subquery()) + total = session.scalar(count_stmt) + + # Apply pagination limits + offset_stmt = stmt.offset((page - 1) * per_page).limit(per_page) + + # Execute query and get items + items = list(session.scalars(offset_stmt).all()) + + return { + "page": page, + "limit": per_page, + "total": total, + "has_more": total > page * per_page, + "data": items, + } @staticmethod def _safe_parse_uuid(value: str):