From 9632f7d51548b6c0e46d9fca5cc1c36cb40f20c7 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Wed, 5 Mar 2025 14:46:48 +0800 Subject: [PATCH] fix metadata --- api/controllers/console/datasets/metadata.py | 16 +- .../knowledge_retrieval_node.py | 2 +- api/models/dataset.py | 4 +- api/services/metadata_service.py | 241 ++++++++++-------- 4 files changed, 151 insertions(+), 112 deletions(-) diff --git a/api/controllers/console/datasets/metadata.py b/api/controllers/console/datasets/metadata.py index d3e52daa5f..14183a1e67 100644 --- a/api/controllers/console/datasets/metadata.py +++ b/api/controllers/console/datasets/metadata.py @@ -48,12 +48,24 @@ class DatasetMetadataCreateApi(Resource): metadata = MetadataService.create_metadata(dataset_id_str, metadata_args) return metadata, 201 + @setup_required + @login_required + @account_initialization_required + @enterprise_license_required + def get(self, dataset_id): + dataset_id_str = str(dataset_id) + dataset = DatasetService.get_dataset(dataset_id_str) + if dataset is None: + raise NotFound("Dataset not found.") + return MetadataService.get_dataset_metadatas(dataset), 200 + class DatasetMetadataApi(Resource): @setup_required @login_required @account_initialization_required @enterprise_license_required + @marshal_with(dataset_metadata_fields) def patch(self, dataset_id, metadata_id): parser = reqparse.RequestParser() parser.add_argument("name", type=str, required=True, nullable=True, location="json") @@ -92,7 +104,7 @@ class DatasetMetadataBuiltInFieldApi(Resource): @enterprise_license_required def get(self): built_in_fields = MetadataService.get_built_in_fields() - return built_in_fields, 200 + return {"fields": built_in_fields}, 200 class DatasetMetadataBuiltInFieldActionApi(Resource): @@ -139,5 +151,5 @@ class DocumentMetadataEditApi(Resource): api.add_resource(DatasetMetadataCreateApi, "/datasets//metadata") api.add_resource(DatasetMetadataApi, "/datasets//metadata/") api.add_resource(DatasetMetadataBuiltInFieldApi, "/datasets/metadata/built-in") -api.add_resource(DatasetMetadataBuiltInFieldActionApi, "/datasets/metadata/built-in/") +api.add_resource(DatasetMetadataBuiltInFieldActionApi, "/datasets//metadata/built-in/") api.add_resource(DocumentMetadataEditApi, "/datasets//documents/metadata") diff --git a/api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py b/api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py index 273a6773a0..ddc2c5ce1d 100644 --- a/api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py +++ b/api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py @@ -2,7 +2,7 @@ import json import logging from collections import defaultdict from collections.abc import Mapping, Sequence -from typing import Any, cast, Optional +from typing import Any, Optional, cast from sqlalchemy import func diff --git a/api/models/dataset.py b/api/models/dataset.py index 86408aa519..4bd0b0ea6f 100644 --- a/api/models/dataset.py +++ b/api/models/dataset.py @@ -474,7 +474,7 @@ class Document(db.Model): # type: ignore[name-defined] "id": "built-in", "name": BuiltInField.upload_date, "type": "date", - "value": self.created_at, + "value": self.created_at.timestamp(), } ) built_in_fields.append( @@ -482,7 +482,7 @@ class Document(db.Model): # type: ignore[name-defined] "id": "built-in", "name": BuiltInField.last_update_date, "type": "date", - "value": self.updated_at, + "value": self.updated_at.timestamp(), } ) built_in_fields.append( diff --git a/api/services/metadata_service.py b/api/services/metadata_service.py index 9877e09fdd..45814ee066 100644 --- a/api/services/metadata_service.py +++ b/api/services/metadata_service.py @@ -1,4 +1,5 @@ import datetime +import logging from typing import Optional from flask_login import current_user # type: ignore @@ -12,13 +13,13 @@ from services.entities.knowledge_entities.knowledge_entities import ( MetadataArgs, MetadataOperationData, ) -from tasks.update_documents_metadata_task import update_documents_metadata_task class MetadataService: @staticmethod def create_metadata(dataset_id: str, metadata_args: MetadataArgs) -> DatasetMetadata: metadata = DatasetMetadata( + tenant_id=current_user.current_tenant_id, dataset_id=dataset_id, type=metadata_args.type, name=metadata_args.name, @@ -31,49 +32,55 @@ class MetadataService: @staticmethod def update_metadata_name(dataset_id: str, metadata_id: str, name: str) -> DatasetMetadata: lock_key = f"dataset_metadata_lock_{dataset_id}" - MetadataService.knowledge_base_metadata_lock_check(dataset_id, None) - metadata = DatasetMetadata.query.filter_by(id=metadata_id).first() - if metadata is None: - raise ValueError("Metadata not found.") - old_name = metadata.name - metadata.name = name - metadata.updated_by = current_user.id - metadata.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) - - # update related documents - documents = [] - dataset_metadata_bindings = DatasetMetadataBinding.query.filter_by(metadata_id=metadata_id).all() - if dataset_metadata_bindings: - document_ids = [binding.document_id for binding in dataset_metadata_bindings] - documents = DocumentService.get_document_by_ids(document_ids) - for document in documents: - document.doc_metadata[name] = document.doc_metadata.pop(old_name) - db.session.add(document) - db.session.commit() - if document_ids: - update_documents_metadata_task.delay(dataset_id, document_ids, lock_key) - return metadata + try: + MetadataService.knowledge_base_metadata_lock_check(dataset_id, None) + metadata = DatasetMetadata.query.filter_by(id=metadata_id).first() + if metadata is None: + raise ValueError("Metadata not found.") + old_name = metadata.name + metadata.name = name + metadata.updated_by = current_user.id + metadata.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + # update related documents + dataset_metadata_bindings = DatasetMetadataBinding.query.filter_by(metadata_id=metadata_id).all() + if dataset_metadata_bindings: + document_ids = [binding.document_id for binding in dataset_metadata_bindings] + documents = DocumentService.get_document_by_ids(document_ids) + for document in documents: + document.doc_metadata[name] = document.doc_metadata.pop(old_name) + db.session.add(document) + db.session.commit() + return metadata + except Exception: + logging.exception("Update metadata name failed") + finally: + redis_client.delete(lock_key) + @staticmethod def delete_metadata(dataset_id: str, metadata_id: str): lock_key = f"dataset_metadata_lock_{dataset_id}" - MetadataService.knowledge_base_metadata_lock_check(dataset_id, None) - metadata = DatasetMetadata.query.filter_by(id=metadata_id).first() - if metadata is None: - raise ValueError("Metadata not found.") - db.session.delete(metadata) + try: + MetadataService.knowledge_base_metadata_lock_check(dataset_id, None) + metadata = DatasetMetadata.query.filter_by(id=metadata_id).first() + if metadata is None: + raise ValueError("Metadata not found.") + db.session.delete(metadata) - # delete related documents - dataset_metadata_bindings = DatasetMetadataBinding.query.filter_by(metadata_id=metadata_id).all() - if dataset_metadata_bindings: - document_ids = [binding.document_id for binding in dataset_metadata_bindings] - documents = DocumentService.get_document_by_ids(document_ids) - for document in documents: - document.doc_metadata.pop(metadata.name) - db.session.add(document) - db.session.commit() - if document_ids: - update_documents_metadata_task.delay(dataset_id, document_ids, lock_key) + # delete related documents + dataset_metadata_bindings = DatasetMetadataBinding.query.filter_by(metadata_id=metadata_id).all() + if dataset_metadata_bindings: + document_ids = [binding.document_id for binding in dataset_metadata_bindings] + documents = DocumentService.get_document_by_ids(document_ids) + for document in documents: + document.doc_metadata.pop(metadata.name) + db.session.add(document) + db.session.commit() + return metadata + except Exception: + logging.exception("Delete metadata failed") + finally: + redis_client.delete(lock_key) @staticmethod def get_built_in_fields(): @@ -87,86 +94,91 @@ class MetadataService: @staticmethod def enable_built_in_field(dataset: Dataset): - if dataset.built_in_fields: + if dataset.built_in_field_enabled: return lock_key = f"dataset_metadata_lock_{dataset.id}" - MetadataService.knowledge_base_metadata_lock_check(dataset.id, None) - dataset.built_in_fields = True - db.session.add(dataset) - documents = DocumentService.get_working_documents_by_dataset_id(dataset.id) - document_ids = [] - if documents: - for document in documents: - document.doc_metadata[BuiltInField.document_name] = document.name - document.doc_metadata[BuiltInField.uploader] = document.uploader - document.doc_metadata[BuiltInField.upload_date] = document.upload_date.strftime("%Y-%m-%d %H:%M:%S") - document.doc_metadata[BuiltInField.last_update_date] = document.last_update_date.strftime( - "%Y-%m-%d %H:%M:%S" - ) - document.doc_metadata[BuiltInField.source] = document.data_source_type - db.session.add(document) - document_ids.append(document.id) - db.session.commit() - if document_ids: - update_documents_metadata_task.delay(dataset.id, document_ids, lock_key) + try: + MetadataService.knowledge_base_metadata_lock_check(dataset.id, None) + dataset.built_in_field_enabled = True + db.session.add(dataset) + documents = DocumentService.get_working_documents_by_dataset_id(dataset.id) + if documents: + for document in documents: + if not document.doc_metadata: + document.doc_metadata = {} + document.doc_metadata[BuiltInField.document_name] = document.name + document.doc_metadata[BuiltInField.uploader] = document.uploader + document.doc_metadata[BuiltInField.upload_date] = document.upload_date.timestamp() + document.doc_metadata[BuiltInField.last_update_date] = document.last_update_date.timestamp() + document.doc_metadata[BuiltInField.source] = document.data_source_type + db.session.add(document) + db.session.commit() + except Exception: + logging.exception("Enable built-in field failed") + finally: + redis_client.delete(lock_key) @staticmethod def disable_built_in_field(dataset: Dataset): - if not dataset.built_in_fields: + if not dataset.built_in_field_enabled: return lock_key = f"dataset_metadata_lock_{dataset.id}" - MetadataService.knowledge_base_metadata_lock_check(dataset.id, None) - dataset.built_in_fields = False - db.session.add(dataset) - documents = DocumentService.get_working_documents_by_dataset_id(dataset.id) - document_ids = [] - if documents: - for document in documents: - document.doc_metadata.pop(BuiltInField.document_name) - document.doc_metadata.pop(BuiltInField.uploader) - document.doc_metadata.pop(BuiltInField.upload_date) - document.doc_metadata.pop(BuiltInField.last_update_date) - document.doc_metadata.pop(BuiltInField.source) - db.session.add(document) - document_ids.append(document.id) - db.session.commit() - if document_ids: - update_documents_metadata_task.delay(dataset.id, document_ids, lock_key) + try: + MetadataService.knowledge_base_metadata_lock_check(dataset.id, None) + dataset.built_in_field_enabled = False + db.session.add(dataset) + documents = DocumentService.get_working_documents_by_dataset_id(dataset.id) + document_ids = [] + if documents: + for document in documents: + document.doc_metadata.pop(BuiltInField.document_name) + document.doc_metadata.pop(BuiltInField.uploader) + document.doc_metadata.pop(BuiltInField.upload_date) + document.doc_metadata.pop(BuiltInField.last_update_date) + document.doc_metadata.pop(BuiltInField.source) + db.session.add(document) + document_ids.append(document.id) + db.session.commit() + except Exception: + logging.exception("Disable built-in field failed") + finally: + redis_client.delete(lock_key) @staticmethod def update_documents_metadata(dataset: Dataset, metadata_args: MetadataOperationData): for operation in metadata_args.operation_data: lock_key = f"document_metadata_lock_{operation.document_id}" - MetadataService.knowledge_base_metadata_lock_check(None, operation.document_id) - document = DocumentService.get_document(operation.document_id) - if document is None: - raise ValueError("Document not found.") - document.doc_metadata = {} - for metadata_value in metadata_args.fields: - document.doc_metadata[metadata_value.name] = metadata_value.value - if dataset.built_in_fields: - document.doc_metadata[BuiltInField.document_name] = document.name - document.doc_metadata[BuiltInField.uploader] = document.uploader - document.doc_metadata[BuiltInField.upload_date] = document.upload_date.strftime("%Y-%m-%d %H:%M:%S") - document.doc_metadata[BuiltInField.last_update_date] = document.last_update_date.strftime( - "%Y-%m-%d %H:%M:%S" - ) - document.doc_metadata[BuiltInField.source] = document.data_source_type - # deal metadata bindding - DatasetMetadataBinding.query.filter_by(document_id=operation.document_id).delete() - for metadata_value in operation.metadata_list: - dataset_metadata_binding = DatasetMetadataBinding( - tenant_id=current_user.tenant_id, - dataset_id=dataset.id, - document_id=operation.document_id, - metadata_id=metadata_value.id, - created_by=current_user.id, - ) - db.session.add(dataset_metadata_binding) - db.session.add(document) - db.session.commit() - - update_documents_metadata_task.delay(dataset.id, [document.id], lock_key) + try: + MetadataService.knowledge_base_metadata_lock_check(None, operation.document_id) + document = DocumentService.get_document(operation.document_id) + if document is None: + raise ValueError("Document not found.") + document.doc_metadata = {} + for metadata_value in metadata_args.fields: + document.doc_metadata[metadata_value.name] = metadata_value.value + if dataset.built_in_fields: + document.doc_metadata[BuiltInField.document_name] = document.name + document.doc_metadata[BuiltInField.uploader] = document.uploader + document.doc_metadata[BuiltInField.upload_date] = document.upload_date.timestamp() + document.doc_metadata[BuiltInField.last_update_date] = document.last_update_date.timestamp() + document.doc_metadata[BuiltInField.source] = document.data_source_type + # deal metadata bindding + DatasetMetadataBinding.query.filter_by(document_id=operation.document_id).delete() + for metadata_value in operation.metadata_list: + dataset_metadata_binding = DatasetMetadataBinding( + tenant_id=current_user.current_tenant_id, + dataset_id=dataset.id, + document_id=operation.document_id, + metadata_id=metadata_value.id, + created_by=current_user.id, + ) + db.session.add(dataset_metadata_binding) + db.session.add(document) + db.session.commit() + except Exception: + logging.exception("Update documents metadata failed") + finally: + redis_client.delete(lock_key) @staticmethod def knowledge_base_metadata_lock_check(dataset_id: Optional[str], document_id: Optional[str]): @@ -180,3 +192,18 @@ class MetadataService: if redis_client.get(lock_key): raise ValueError("Another document metadata operation is running, please wait a moment.") redis_client.set(lock_key, 1, ex=3600) + + @staticmethod + def get_dataset_metadatas(dataset: Dataset): + return { + "doc_metadata": [ + { + "id": item.get("id"), + "name": item.get("name"), + "type": item.get("type"), + "count": DatasetMetadataBinding.query.filter_by(metadata_id=item.get("id"), dataset_id=dataset.id).count(), + } + for item in dataset.doc_metadata or [] + ], + "built_in_field_enabled": dataset.built_in_field_enabled, + }