feat: implement SQLAlchemy 2.0 style pagination for workflow app logs with time-based filtering

Signed-off-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
-LAN- 2025-03-04 15:34:58 +08:00
parent e8243c566f
commit 6460282f48
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF
4 changed files with 54 additions and 28 deletions

View File

@ -1,9 +1,11 @@
from flask_restful import Resource, marshal_with, reqparse # type: ignore from flask_restful import Resource, marshal_with, reqparse # type: ignore
from flask_restful.inputs import int_range # type: ignore from flask_restful.inputs import int_range # type: ignore
from sqlalchemy.orm import Session
from controllers.console import api from controllers.console import api
from controllers.console.app.wraps import get_app_model from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required from controllers.console.wraps import account_initialization_required, setup_required
from extensions.ext_database import db
from fields.workflow_app_log_fields import workflow_app_log_pagination_fields from fields.workflow_app_log_fields import workflow_app_log_pagination_fields
from libs.login import login_required from libs.login import login_required
from models import App from models import App
@ -36,11 +38,12 @@ class WorkflowAppLogApi(Resource):
# get paginate workflow app logs # get paginate workflow app logs
workflow_app_service = WorkflowAppService() workflow_app_service = WorkflowAppService()
workflow_app_log_pagination = workflow_app_service.get_paginate_workflow_app_logs( with Session(db.engine) as session:
app_model=app_model, args=args workflow_app_log_pagination = workflow_app_service.get_paginate_workflow_app_logs(
) session=session, app_model=app_model, args=args
)
return workflow_app_log_pagination return workflow_app_log_pagination
api.add_resource(WorkflowAppLogApi, "/apps/<uuid:app_id>/workflow-app-logs") api.add_resource(WorkflowAppLogApi, "/apps/<uuid:app_id>/workflow-app-logs")

View File

@ -2,6 +2,7 @@ import logging
from flask_restful import Resource, fields, marshal_with, reqparse # type: ignore from flask_restful import Resource, fields, marshal_with, reqparse # type: ignore
from flask_restful.inputs import int_range # type: ignore from flask_restful.inputs import int_range # type: ignore
from sqlalchemy.orm import Session
from werkzeug.exceptions import InternalServerError from werkzeug.exceptions import InternalServerError
from controllers.service_api import api from controllers.service_api import api
@ -125,17 +126,20 @@ class WorkflowAppLogApi(Resource):
parser = reqparse.RequestParser() parser = reqparse.RequestParser()
parser.add_argument("keyword", type=str, location="args") parser.add_argument("keyword", type=str, location="args")
parser.add_argument("status", type=str, choices=["succeeded", "failed", "stopped"], location="args") parser.add_argument("status", type=str, choices=["succeeded", "failed", "stopped"], location="args")
parser.add_argument("created_at__before", type=str, location="args")
parser.add_argument("created_at__after", type=str, location="args")
parser.add_argument("page", type=int_range(1, 99999), default=1, location="args") parser.add_argument("page", type=int_range(1, 99999), default=1, location="args")
parser.add_argument("limit", type=int_range(1, 100), default=20, location="args") parser.add_argument("limit", type=int_range(1, 100), default=20, location="args")
args = parser.parse_args() args = parser.parse_args()
# get paginate workflow app logs # get paginate workflow app logs
workflow_app_service = WorkflowAppService() workflow_app_service = WorkflowAppService()
workflow_app_log_pagination = workflow_app_service.get_paginate_workflow_app_logs( with Session(db.engine) as session:
app_model=app_model, args=args workflow_app_log_pagination = workflow_app_service.get_paginate_workflow_app_logs(
) session=session, app_model=app_model, args=args
)
return workflow_app_log_pagination return workflow_app_log_pagination
api.add_resource(WorkflowRunApi, "/workflows/run") api.add_resource(WorkflowRunApi, "/workflows/run")

View File

@ -20,5 +20,5 @@ workflow_app_log_pagination_fields = {
"limit": fields.Integer(attribute="per_page"), "limit": fields.Integer(attribute="per_page"),
"total": fields.Integer, "total": fields.Integer,
"has_more": fields.Boolean(attribute="has_next"), "has_more": fields.Boolean(attribute="has_next"),
"data": fields.List(fields.Nested(workflow_app_log_partial_fields), attribute="items"), "data": fields.List(fields.Nested(workflow_app_log_partial_fields)),
} }

View File

@ -1,31 +1,33 @@
import uuid import uuid
from datetime import datetime from datetime import datetime
from flask_sqlalchemy.pagination import Pagination from sqlalchemy import and_, func, or_, select
from sqlalchemy import and_, or_ from sqlalchemy.orm import Session
from extensions.ext_database import db
from models import App, EndUser, WorkflowAppLog, WorkflowRun from models import App, EndUser, WorkflowAppLog, WorkflowRun
from models.enums import CreatedByRole from models.enums import CreatedByRole
from models.workflow import WorkflowRunStatus from models.workflow import WorkflowRunStatus
class WorkflowAppService: class WorkflowAppService:
def get_paginate_workflow_app_logs(self, app_model: App, args: dict) -> Pagination: def get_paginate_workflow_app_logs(self, *, session: Session, app_model: App, args: dict) -> dict:
""" """
Get paginate workflow app logs Get paginate workflow app logs using SQLAlchemy 2.0 style
:param app: app model :param app_model: app model
:param args: request args :param args: request args
:return: :param session: SQLAlchemy session, if None will use db
:return: Pagination object
""" """
query = db.select(WorkflowAppLog).where( # Build base statement using SQLAlchemy 2.0 style
stmt = select(WorkflowAppLog).where(
WorkflowAppLog.tenant_id == app_model.tenant_id, WorkflowAppLog.app_id == app_model.id WorkflowAppLog.tenant_id == app_model.tenant_id, WorkflowAppLog.app_id == app_model.id
) )
status = WorkflowRunStatus.value_of(args.get("status", "")) if args.get("status") else None status = WorkflowRunStatus.value_of(args.get("status", "")) if args.get("status") else None
keyword = args["keyword"] keyword = args.get("keyword")
if keyword or status: if keyword or status:
query = query.join(WorkflowRun, WorkflowRun.id == WorkflowAppLog.workflow_run_id) stmt = stmt.join(WorkflowRun, WorkflowRun.id == WorkflowAppLog.workflow_run_id)
if keyword: if keyword:
keyword_like_val = f"%{keyword[:30].encode('unicode_escape').decode('utf-8')}%".replace(r"\u", r"\\u") keyword_like_val = f"%{keyword[:30].encode('unicode_escape').decode('utf-8')}%".replace(r"\u", r"\\u")
@ -41,21 +43,21 @@ class WorkflowAppService:
if keyword_uuid: if keyword_uuid:
keyword_conditions.append(WorkflowRun.id == keyword_uuid) keyword_conditions.append(WorkflowRun.id == keyword_uuid)
query = query.outerjoin( stmt = stmt.outerjoin(
EndUser, EndUser,
and_(WorkflowRun.created_by == EndUser.id, WorkflowRun.created_by_role == CreatedByRole.END_USER), and_(WorkflowRun.created_by == EndUser.id, WorkflowRun.created_by_role == CreatedByRole.END_USER),
).filter(or_(*keyword_conditions)) ).where(or_(*keyword_conditions))
if status: if status:
# join with workflow_run and filter by status # filter by status
query = query.filter(WorkflowRun.status == status.value) stmt = stmt.where(WorkflowRun.status == status.value)
# Add time-based filtering # Add time-based filtering
created_at_before = args.get("created_at__before") created_at_before = args.get("created_at__before")
if created_at_before: if created_at_before:
try: try:
before_date = datetime.fromisoformat(created_at_before.replace("Z", "+00:00")) before_date = datetime.fromisoformat(created_at_before.replace("Z", "+00:00"))
query = query.filter(WorkflowAppLog.created_at <= before_date) stmt = stmt.where(WorkflowAppLog.created_at <= before_date)
except ValueError: except ValueError:
pass # Ignore invalid date format pass # Ignore invalid date format
@ -63,15 +65,32 @@ class WorkflowAppService:
if created_at_after: if created_at_after:
try: try:
after_date = datetime.fromisoformat(created_at_after.replace("Z", "+00:00")) after_date = datetime.fromisoformat(created_at_after.replace("Z", "+00:00"))
query = query.filter(WorkflowAppLog.created_at >= after_date) stmt = stmt.where(WorkflowAppLog.created_at >= after_date)
except ValueError: except ValueError:
pass # Ignore invalid date format pass # Ignore invalid date format
query = query.order_by(WorkflowAppLog.created_at.desc()) stmt = stmt.order_by(WorkflowAppLog.created_at.desc())
pagination = db.paginate(query, page=args["page"], per_page=args["limit"], error_out=False) page = args["page"]
per_page = args["limit"]
return pagination # Get total count using the same filters
count_stmt = select(func.count()).select_from(stmt.subquery())
total = session.scalar(count_stmt)
# Apply pagination limits
offset_stmt = stmt.offset((page - 1) * per_page).limit(per_page)
# Execute query and get items
items = list(session.scalars(offset_stmt).all())
return {
"page": page,
"limit": per_page,
"total": total,
"has_more": total > page * per_page,
"data": items,
}
@staticmethod @staticmethod
def _safe_parse_uuid(value: str): def _safe_parse_uuid(value: str):