Test Cases (#2993)

### What problem does this PR solve?

Test Cases

### Type of change

- [x] Refactoring

Co-authored-by: liuhua <10215101452@stu.ecun.edu.cn>
This commit is contained in:
liuhua 2024-10-23 22:58:27 +08:00 committed by GitHub
parent 2174c350be
commit 50b425cf89
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 556 additions and 418 deletions

View File

@ -37,9 +37,9 @@ def create(tenant_id):
language = req.get("language")
chunk_method = req.get("chunk_method")
parser_config = req.get("parser_config")
valid_permission = {"me", "team"}
valid_language ={"Chinese", "English"}
valid_chunk_method = {"naive","manual","qa","table","paper","book","laws","presentation","picture","one","knowledge_graph","email"}
valid_permission = ["me", "team"]
valid_language =["Chinese", "English"]
valid_chunk_method = ["naive","manual","qa","table","paper","book","laws","presentation","picture","one","knowledge_graph","email"]
check_validation=valid(permission,valid_permission,language,valid_language,chunk_method,valid_chunk_method)
if check_validation:
return check_validation
@ -47,10 +47,8 @@ def create(tenant_id):
if "tenant_id" in req:
return get_error_data_result(
retmsg="`tenant_id` must not be provided")
chunk_count=req.get("chunk_count")
document_count=req.get("document_count")
if chunk_count or document_count:
return get_error_data_result(retmsg="`chunk_count` or `document_count` must be 0 or not be provided")
if "chunk_count" in req or "document_count" in req:
return get_error_data_result(retmsg="`chunk_count` or `document_count` must not be provided")
if "name" not in req:
return get_error_data_result(
retmsg="`name` is not empty!")
@ -123,10 +121,10 @@ def update(tenant_id,dataset_id):
language = req.get("language")
chunk_method = req.get("chunk_method")
parser_config = req.get("parser_config")
valid_permission = {"me", "team"}
valid_language = {"Chinese", "English"}
valid_chunk_method = {"naive", "manual", "qa", "table", "paper", "book", "laws", "presentation", "picture", "one",
"knowledge_graph", "email"}
valid_permission = ["me", "team"]
valid_language = ["Chinese", "English"]
valid_chunk_method = ["naive", "manual", "qa", "table", "paper", "book", "laws", "presentation", "picture", "one",
"knowledge_graph", "email"]
check_validation = valid(permission, valid_permission, language, valid_language, chunk_method, valid_chunk_method)
if check_validation:
return check_validation

View File

@ -44,6 +44,7 @@ from rag.nlp import search
from rag.utils import rmSpace
from rag.utils.es_conn import ELASTICSEARCH
from rag.utils.storage_factory import STORAGE_IMPL
import os
MAXIMUM_OF_UPLOADING_FILES = 256

View File

@ -337,7 +337,7 @@ def valid(permission,valid_permission,language,valid_language,chunk_method,valid
def valid_parameter(parameter,valid_values):
if parameter and parameter not in valid_values:
return get_error_data_result(f"{parameter} not in {valid_values}")
return get_error_data_result(f"{parameter} is not in {valid_values}")
def get_parser_config(chunk_method,parser_config):
if parser_config:

View File

@ -51,6 +51,8 @@ class RAGFlow:
def create_dataset(self, name: str, avatar: str = "", description: str = "", language: str = "English",
permission: str = "me",chunk_method: str = "naive",
parser_config: DataSet.ParserConfig = None) -> DataSet:
if parser_config:
parser_config = parser_config.to_json()
res = self.post("/dataset",
{"name": name, "avatar": avatar, "description": description, "language": language,
"permission": permission, "chunk_method": chunk_method,
@ -91,7 +93,7 @@ class RAGFlow:
llm: Chat.LLM = None, prompt: Chat.Prompt = None) -> Chat:
dataset_list = []
for dataset in datasets:
dataset_list.append(dataset.to_json())
dataset_list.append(dataset.id)
if llm is None:
llm = Chat.LLM(self, {"model_name": None,

View File

@ -1,4 +0,0 @@
API_KEY = 'ragflow-NiYmZjNTVjODYwNzExZWZiODEwMDI0Mm'
HOST_ADDRESS = 'http://127.0.0.1:9380'

View File

@ -0,0 +1,52 @@
import pytest
import requests
import string
import random
HOST_ADDRESS = 'http://127.0.0.1:9380'
def generate_random_email():
return 'user_' + ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))+'@1.com'
EMAIL = generate_random_email()
# password is "123"
PASSWORD='''ctAseGvejiaSWWZ88T/m4FQVOpQyUvP+x7sXtdv3feqZACiQleuewkUi35E16wSd5C5QcnkkcV9cYc8TKPTRZlxappDuirxghxoOvFcJxFU4ixLsD
fN33jCHRoDUW81IH9zjij/vaw8IbVyb6vuwg6MX6inOEBRRzVbRYxXOu1wkWY6SsI8X70oF9aeLFp/PzQpjoe/YbSqpTq8qqrmHzn9vO+yvyYyvmDsphXe
X8f7fp9c7vUsfOCkM+gHY3PadG+QHa7KI7mzTKgUTZImK6BZtfRBATDTthEUbbaTewY4H0MnWiCeeDhcbeQao6cFy1To8pE3RpmxnGnS8BsBn8w=='''
def get_email():
return EMAIL
def register():
url = HOST_ADDRESS + "/v1/user/register"
name = "user"
register_data = {"email":EMAIL,"nickname":name,"password":PASSWORD}
res = requests.post(url=url,json=register_data)
res = res.json()
if res.get("retcode") != 0:
raise Exception(res.get("retmsg"))
def login():
url = HOST_ADDRESS + "/v1/user/login"
login_data = {"email":EMAIL,"password":PASSWORD}
response=requests.post(url=url,json=login_data)
res = response.json()
if res.get("retcode")!=0:
raise Exception(res.get("retmsg"))
auth = response.headers["Authorization"]
return auth
@pytest.fixture(scope="session")
def get_api_key_fixture():
register()
auth = login()
url = HOST_ADDRESS + "/v1/system/new_token"
auth = {"Authorization": auth}
response = requests.post(url=url,headers=auth)
res = response.json()
if res.get("retcode") != 0:
raise Exception(res.get("retmsg"))
return res["data"].get("token")

View File

@ -1,57 +1,67 @@
from ragflow import RAGFlow, Chat
from xgboost.testing import datasets
import time
HOST_ADDRESS = 'http://127.0.0.1:9380'
from common import API_KEY, HOST_ADDRESS
from test_sdkbase import TestSdk
def test_create_chat_with_name(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
kb = rag.create_dataset(name="test_create_chat")
displayed_name = "ragflow.txt"
with open("./ragflow.txt","rb") as file:
blob = file.read()
document = {"displayed_name":displayed_name,"blob":blob}
documents = []
documents.append(document)
doc_ids = []
docs= kb.upload_documents(documents)
for doc in docs:
doc_ids.append(doc.id)
kb.async_parse_documents(doc_ids)
time.sleep(60)
rag.create_chat("test_create", datasets=[kb])
class TestChat(TestSdk):
def test_create_chat_with_success(self):
"""
Test creating an chat with success
"""
rag = RAGFlow(API_KEY, HOST_ADDRESS)
kb = rag.create_dataset(name="test_create_chat")
chat = rag.create_chat("test_create", datasets=[kb])
if isinstance(chat, Chat):
assert chat.name == "test_create", "Name does not match."
else:
assert False, f"Failed to create chat, error: {chat}"
def test_update_chat_with_name(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
kb = rag.create_dataset(name="test_update_chat")
displayed_name = "ragflow.txt"
with open("./ragflow.txt", "rb") as file:
blob = file.read()
document = {"displayed_name": displayed_name, "blob": blob}
documents = []
documents.append(document)
doc_ids = []
docs = kb.upload_documents(documents)
for doc in docs:
doc_ids.append(doc.id)
kb.async_parse_documents(doc_ids)
time.sleep(60)
chat = rag.create_chat("test_update", datasets=[kb])
chat.update({"name": "new_chat"})
def test_update_chat_with_success(self):
"""
Test updating an chat with success.
"""
rag = RAGFlow(API_KEY, HOST_ADDRESS)
kb = rag.create_dataset(name="test_update_chat")
chat = rag.create_chat("test_update", datasets=[kb])
if isinstance(chat, Chat):
assert chat.name == "test_update", "Name does not match."
res=chat.update({"name":"new_chat"})
assert res is None, f"Failed to update chat, error: {res}"
else:
assert False, f"Failed to create chat, error: {chat}"
def test_delete_chats_with_success(self):
"""
Test deleting an chat with success
"""
rag = RAGFlow(API_KEY, HOST_ADDRESS)
kb = rag.create_dataset(name="test_delete_chat")
chat = rag.create_chat("test_delete", datasets=[kb])
if isinstance(chat, Chat):
assert chat.name == "test_delete", "Name does not match."
res = rag.delete_chats(ids=[chat.id])
assert res is None, f"Failed to delete chat, error: {res}"
else:
assert False, f"Failed to create chat, error: {chat}"
def test_delete_chats_with_success(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
kb = rag.create_dataset(name="test_delete_chat")
displayed_name = "ragflow.txt"
with open("./ragflow.txt", "rb") as file:
blob = file.read()
document = {"displayed_name": displayed_name, "blob": blob}
documents = []
documents.append(document)
doc_ids = []
docs = kb.upload_documents(documents)
for doc in docs:
doc_ids.append(doc.id)
kb.async_parse_documents(doc_ids)
time.sleep(60)
chat = rag.create_chat("test_delete", datasets=[kb])
rag.delete_chats(ids=[chat.id])
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
rag.list_chats()
def test_list_chats_with_success(self):
"""
Test listing chats with success
"""
rag = RAGFlow(API_KEY, HOST_ADDRESS)
list_chats = rag.list_chats()
assert len(list_chats) > 0, "Do not exist any chat"
for chat in list_chats:
assert isinstance(chat, Chat), "Existence type is not chat."

View File

@ -1,53 +1,54 @@
from ragflow import RAGFlow, DataSet
from ragflow import RAGFlow
import random
import pytest
from common import API_KEY, HOST_ADDRESS
from test_sdkbase import TestSdk
HOST_ADDRESS = 'http://127.0.0.1:9380'
def test_create_dataset_with_name(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
rag.create_dataset("test_create_dataset_with_name")
def test_create_dataset_with_duplicated_name(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
with pytest.raises(Exception) as exc_info:
rag.create_dataset("test_create_dataset_with_name")
assert str(exc_info.value) == "Duplicated dataset name in creating dataset."
def test_create_dataset_with_random_chunk_method(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
valid_chunk_methods = ["naive","manual","qa","table","paper","book","laws","presentation","picture","one","knowledge_graph","email"]
random_chunk_method = random.choice(valid_chunk_methods)
rag.create_dataset("test_create_dataset_with_random_chunk_method",chunk_method=random_chunk_method)
def test_create_dataset_with_invalid_parameter(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
valid_chunk_methods = ["naive", "manual", "qa", "table", "paper", "book", "laws", "presentation", "picture", "one",
"knowledge_graph", "email"]
chunk_method = "invalid_chunk_method"
with pytest.raises(Exception) as exc_info:
rag.create_dataset("test_create_dataset_with_name",chunk_method=chunk_method)
assert str(exc_info.value) == f"{chunk_method} is not in {valid_chunk_methods}"
class TestDataset(TestSdk):
def test_create_dataset_with_success(self):
"""
Test creating a dataset with success
"""
rag = RAGFlow(API_KEY, HOST_ADDRESS)
ds = rag.create_dataset("God")
if isinstance(ds, DataSet):
assert ds.name == "God", "Name does not match."
else:
assert False, f"Failed to create dataset, error: {ds}"
def test_update_dataset_with_name(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
ds = rag.create_dataset("test_update_dataset")
ds.update({"name": "updated_dataset"})
def test_update_dataset_with_success(self):
"""
Test updating a dataset with success.
"""
rag = RAGFlow(API_KEY, HOST_ADDRESS)
ds = rag.create_dataset("ABC")
if isinstance(ds, DataSet):
assert ds.name == "ABC", "Name does not match."
res = ds.update({"name":"DEF"})
assert res is None, f"Failed to update dataset, error: {res}"
else:
assert False, f"Failed to create dataset, error: {ds}"
def test_delete_datasets_with_success(self):
"""
Test deleting a dataset with success
"""
rag = RAGFlow(API_KEY, HOST_ADDRESS)
ds = rag.create_dataset("MA")
if isinstance(ds, DataSet):
assert ds.name == "MA", "Name does not match."
res = rag.delete_datasets(ids=[ds.id])
assert res is None, f"Failed to delete dataset, error: {res}"
else:
assert False, f"Failed to create dataset, error: {ds}"
def test_delete_datasets_with_success(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
ds = rag.create_dataset("MA")
rag.delete_datasets(ids=[ds.id])
def test_list_datasets_with_success(self):
"""
Test listing datasets with success
"""
rag = RAGFlow(API_KEY, HOST_ADDRESS)
list_datasets = rag.list_datasets()
assert len(list_datasets) > 0, "Do not exist any dataset"
for ds in list_datasets:
assert isinstance(ds, DataSet), "Existence type is not dataset."
def test_list_datasets_with_success(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
rag.list_datasets()

View File

@ -1,298 +1,321 @@
from ragflow import RAGFlow, DataSet, Document, Chunk
from common import API_KEY, HOST_ADDRESS
from test_sdkbase import TestSdk
HOST_ADDRESS = 'http://127.0.0.1:9380'
class TestDocument(TestSdk):
def test_upload_document_with_success(self):
"""
Test ingesting a document into a dataset with success.
"""
# Initialize RAGFlow instance
rag = RAGFlow(API_KEY, HOST_ADDRESS)
def test_upload_document_with_success(get_api_key_fixture):
"""
Test ingesting a document into a dataset with success.
"""
# Initialize RAGFlow instance
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
# Step 1: Create a new dataset
ds = rag.create_dataset(name="God")
# Step 1: Create a new dataset
ds = rag.create_dataset(name="God")
# Ensure dataset creation was successful
assert isinstance(ds, DataSet), f"Failed to create dataset, error: {ds}"
assert ds.name == "God", "Dataset name does not match."
# Ensure dataset creation was successful
assert isinstance(ds, DataSet), f"Failed to create dataset, error: {ds}"
assert ds.name == "God", "Dataset name does not match."
# Step 2: Create a new document
# The blob is the actual file content or a placeholder in this case
blob = b"Sample document content for ingestion test."
blob_2 = b"test_2."
list_1 = []
list_1.append({"name":"Test_1.txt",
"blob":blob})
list_1.append({"name":"Test_2.txt",
"blob":blob_2})
res = ds.upload_documents(list_1)
# Ensure document ingestion was successful
assert res is None, f"Failed to create document, error: {res}"
# Step 2: Create a new document
# The blob is the actual file content or a placeholder in this case
blob = b"Sample document content for ingestion test."
blob_2 = b"test_2."
list_1 = []
list_1.append({"name": "Test_1.txt",
"blob": blob})
list_1.append({"name": "Test_2.txt",
"blob": blob_2})
res = ds.upload_documents(list_1)
# Ensure document ingestion was successful
assert res is None, f"Failed to create document, error: {res}"
def test_update_document_with_success(self):
"""
Test updating a document with success.
Update name or chunk_method are supported
"""
rag = RAGFlow(API_KEY, HOST_ADDRESS)
ds = rag.list_datasets(name="God")
ds = ds[0]
doc = ds.list_documents()
doc = doc[0]
if isinstance(doc, Document):
res = doc.update({"chunk_method":"manual","name":"manual.txt"})
assert res is None, f"Failed to update document, error: {res}"
else:
assert False, f"Failed to get document, error: {doc}"
def test_download_document_with_success(self):
"""
Test downloading a document with success.
"""
# Initialize RAGFlow instance
rag = RAGFlow(API_KEY, HOST_ADDRESS)
def test_update_document_with_success(get_api_key_fixture):
"""
Test updating a document with success.
Update name or chunk_method are supported
"""
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
ds = rag.list_datasets(name="God")
ds = ds[0]
doc = ds.list_documents()
doc = doc[0]
if isinstance(doc, Document):
res = doc.update({"chunk_method": "manual", "name": "manual.txt"})
assert res is None, f"Failed to update document, error: {res}"
else:
assert False, f"Failed to get document, error: {doc}"
# Retrieve a document
ds = rag.list_datasets(name="God")
ds = ds[0]
doc = ds.list_documents(name="manual.txt")
doc = doc[0]
# Check if the retrieved document is of type Document
if isinstance(doc, Document):
# Download the document content and save it to a file
with open("./ragflow.txt", "wb+") as file:
file.write(doc.download())
# Print the document object for debugging
print(doc)
# Assert that the download was successful
assert True, f"Failed to download document, error: {doc}"
else:
# If the document retrieval fails, assert failure
assert False, f"Failed to get document, error: {doc}"
def test_download_document_with_success(get_api_key_fixture):
"""
Test downloading a document with success.
"""
API_KEY = get_api_key_fixture
# Initialize RAGFlow instance
rag = RAGFlow(API_KEY, HOST_ADDRESS)
def test_list_documents_in_dataset_with_success(self):
"""
Test list all documents into a dataset with success.
"""
# Initialize RAGFlow instance
rag = RAGFlow(API_KEY, HOST_ADDRESS)
# Retrieve a document
ds = rag.list_datasets(name="God")
ds = ds[0]
doc = ds.list_documents(name="manual.txt")
doc = doc[0]
# Check if the retrieved document is of type Document
if isinstance(doc, Document):
# Download the document content and save it to a file
with open("./ragflow.txt", "wb+") as file:
file.write(doc.download())
# Print the document object for debugging
print(doc)
# Step 1: Create a new dataset
ds = rag.create_dataset(name="God2")
# Assert that the download was successful
assert True, f"Failed to download document, error: {doc}"
else:
# If the document retrieval fails, assert failure
assert False, f"Failed to get document, error: {doc}"
# Ensure dataset creation was successful
assert isinstance(ds, DataSet), f"Failed to create dataset, error: {ds}"
assert ds.name == "God2", "Dataset name does not match."
# Step 2: Create a new document
# The blob is the actual file content or a placeholder in this case
name1 = "Test Document111.txt"
blob1 = b"Sample document content for ingestion test111."
name2 = "Test Document222.txt"
blob2 = b"Sample document content for ingestion test222."
list_1 = [{"name":name1,"blob":blob1},{"name":name2,"blob":blob2}]
ds.upload_documents(list_1)
for d in ds.list_documents(keywords="test", offset=0, limit=12):
assert isinstance(d, Document), "Failed to upload documents"
def test_list_documents_in_dataset_with_success(get_api_key_fixture):
"""
Test list all documents into a dataset with success.
"""
API_KEY = get_api_key_fixture
# Initialize RAGFlow instance
rag = RAGFlow(API_KEY, HOST_ADDRESS)
def test_delete_documents_in_dataset_with_success(self):
"""
Test list all documents into a dataset with success.
"""
# Initialize RAGFlow instance
rag = RAGFlow(API_KEY, HOST_ADDRESS)
# Step 1: Create a new dataset
ds = rag.create_dataset(name="God2")
# Step 1: Create a new dataset
ds = rag.create_dataset(name="God3")
# Ensure dataset creation was successful
assert isinstance(ds, DataSet), f"Failed to create dataset, error: {ds}"
assert ds.name == "God2", "Dataset name does not match."
# Ensure dataset creation was successful
assert isinstance(ds, DataSet), f"Failed to create dataset, error: {ds}"
assert ds.name == "God3", "Dataset name does not match."
# Step 2: Create a new document
# The blob is the actual file content or a placeholder in this case
name1 = "Test Document111.txt"
blob1 = b"Sample document content for ingestion test111."
name2 = "Test Document222.txt"
blob2 = b"Sample document content for ingestion test222."
list_1 = [{"name": name1, "blob": blob1}, {"name": name2, "blob": blob2}]
ds.upload_documents(list_1)
for d in ds.list_documents(keywords="test", offset=0, limit=12):
assert isinstance(d, Document), "Failed to upload documents"
# Step 2: Create a new document
# The blob is the actual file content or a placeholder in this case
name1 = "Test Document333.txt"
blob1 = b"Sample document content for ingestion test333."
name2 = "Test Document444.txt"
blob2 = b"Sample document content for ingestion test444."
ds.upload_documents([{"name":name1,"blob":blob1},{"name":name2,"blob":blob2}])
for d in ds.list_documents(keywords="document", offset=0, limit=12):
assert isinstance(d, Document)
ds.delete_documents([d.id])
remaining_docs = ds.list_documents(keywords="rag", offset=0, limit=12)
assert len(remaining_docs) == 0, "Documents were not properly deleted."
def test_parse_and_cancel_document(self):
# Initialize RAGFlow with API key and host address
rag = RAGFlow(API_KEY, HOST_ADDRESS)
def test_delete_documents_in_dataset_with_success(get_api_key_fixture):
"""
Test list all documents into a dataset with success.
"""
API_KEY = get_api_key_fixture
# Initialize RAGFlow instance
rag = RAGFlow(API_KEY, HOST_ADDRESS)
# Create a dataset with a specific name
ds = rag.create_dataset(name="God4")
# Step 1: Create a new dataset
ds = rag.create_dataset(name="God3")
# Define the document name and path
name3 = 'westworld.pdf'
path = './test_data/westworld.pdf'
# Ensure dataset creation was successful
assert isinstance(ds, DataSet), f"Failed to create dataset, error: {ds}"
assert ds.name == "God3", "Dataset name does not match."
# Create a document in the dataset using the file path
ds.upload_documents({"name":name3, "blob":open(path, "rb").read()})
# Step 2: Create a new document
# The blob is the actual file content or a placeholder in this case
name1 = "Test Document333.txt"
blob1 = b"Sample document content for ingestion test333."
name2 = "Test Document444.txt"
blob2 = b"Sample document content for ingestion test444."
ds.upload_documents([{"name": name1, "blob": blob1}, {"name": name2, "blob": blob2}])
for d in ds.list_documents(keywords="document", offset=0, limit=12):
assert isinstance(d, Document)
ds.delete_documents([d.id])
remaining_docs = ds.list_documents(keywords="rag", offset=0, limit=12)
assert len(remaining_docs) == 0, "Documents were not properly deleted."
# Retrieve the document by name
doc = rag.list_documents(name="westworld.pdf")
doc = doc[0]
ds.async_parse_documents(document_ids=[])
# Print message to confirm asynchronous parsing has been initiated
print("Async parsing initiated")
def test_parse_and_cancel_document(get_api_key_fixture):
API_KEY = get_api_key_fixture
# Initialize RAGFlow with API key and host address
rag = RAGFlow(API_KEY, HOST_ADDRESS)
# Use join to wait for parsing to complete and get progress updates
# Create a dataset with a specific name
ds = rag.create_dataset(name="God4")
# Define the document name and path
name3 = 'westworld.pdf'
path = './test_data/westworld.pdf'
# Create a document in the dataset using the file path
ds.upload_documents({"name": name3, "blob": open(path, "rb").read()})
# Retrieve the document by name
doc = rag.list_documents(name="westworld.pdf")
doc = doc[0]
ds.async_parse_documents(document_ids=[])
# Print message to confirm asynchronous parsing has been initiated
print("Async parsing initiated")
# Use join to wait for parsing to complete and get progress updates
for progress, msg in doc.join(interval=5, timeout=10):
print(progress, msg)
# Assert that the progress is within the valid range (0 to 100)
assert 0 <= progress <= 100, f"Invalid progress: {progress}"
# Assert that the message is not empty
assert msg, "Message should not be empty"
# Test cancelling the parsing operation
doc.cancel()
# Print message to confirm parsing has been cancelled successfully
print("Parsing cancelled successfully")
def test_bulk_parse_and_cancel_documents(get_api_key_fixture):
API_KEY = get_api_key_fixture
# Initialize RAGFlow with API key and host address
rag = RAGFlow(API_KEY, HOST_ADDRESS)
# Create a dataset
ds = rag.create_dataset(name="God5")
assert ds is not None, "Dataset creation failed"
assert ds.name == "God5", "Dataset name does not match"
# Prepare a list of file names and paths
documents = [
{'name': 'test1.txt', 'path': 'test_data/test1.txt'},
{'name': 'test2.txt', 'path': 'test_data/test2.txt'},
{'name': 'test3.txt', 'path': 'test_data/test3.txt'}
]
# Create documents in bulk
for doc_info in documents:
with open(doc_info['path'], "rb") as file:
created_doc = rag.create_document(ds, name=doc_info['name'], blob=file.read())
assert created_doc is not None, f"Failed to create document {doc_info['name']}"
# Retrieve document objects in bulk
docs = [rag.get_document(name=doc_info['name']) for doc_info in documents]
ids = [doc.id for doc in docs]
assert len(docs) == len(documents), "Mismatch between created documents and fetched documents"
# Initiate asynchronous parsing for all documents
rag.async_parse_documents(ids)
print("Async bulk parsing initiated")
# Wait for all documents to finish parsing and check progress
for doc in docs:
for progress, msg in doc.join(interval=5, timeout=10):
print(progress, msg)
# Assert that the progress is within the valid range (0 to 100)
assert 0 <= progress <= 100, f"Invalid progress: {progress}"
print(f"{doc.name}: Progress: {progress}, Message: {msg}")
# Assert that progress is within the valid range
assert 0 <= progress <= 100, f"Invalid progress: {progress} for document {doc.name}"
# Assert that the message is not empty
assert msg, "Message should not be empty"
# Test cancelling the parsing operation
doc.cancel()
# Print message to confirm parsing has been cancelled successfully
print("Parsing cancelled successfully")
assert msg, f"Message should not be empty for document {doc.name}"
def test_bulk_parse_and_cancel_documents(self):
# Initialize RAGFlow with API key and host address
rag = RAGFlow(API_KEY, HOST_ADDRESS)
# If progress reaches 100%, assert that parsing is completed successfully
if progress == 100:
assert "completed" in msg.lower(), f"Document {doc.name} did not complete successfully"
# Create a dataset
ds = rag.create_dataset(name="God5")
assert ds is not None, "Dataset creation failed"
assert ds.name == "God5", "Dataset name does not match"
# Cancel parsing for all documents in bulk
cancel_result = rag.async_cancel_parse_documents(ids)
assert cancel_result is None or isinstance(cancel_result, type(None)), "Failed to cancel document parsing"
print("Async bulk parsing cancelled")
# Prepare a list of file names and paths
documents = [
{'name': 'test1.txt', 'path': 'test_data/test1.txt'},
{'name': 'test2.txt', 'path': 'test_data/test2.txt'},
{'name': 'test3.txt', 'path': 'test_data/test3.txt'}
]
# Create documents in bulk
for doc_info in documents:
with open(doc_info['path'], "rb") as file:
created_doc = rag.create_document(ds, name=doc_info['name'], blob=file.read())
assert created_doc is not None, f"Failed to create document {doc_info['name']}"
def test_parse_document_and_chunk_list(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
ds = rag.create_dataset(name="God7")
name = 'story.txt'
path = 'test_data/story.txt'
# name = "Test Document rag.txt"
# blob = " Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps.Sample document content for rag test66. rag wonderful apple os documents apps.Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps."
rag.create_document(ds, name=name, blob=open(path, "rb").read())
doc = rag.get_document(name=name)
doc.async_parse()
# Retrieve document objects in bulk
docs = [rag.get_document(name=doc_info['name']) for doc_info in documents]
ids = [doc.id for doc in docs]
assert len(docs) == len(documents), "Mismatch between created documents and fetched documents"
# Wait for parsing to complete and get progress updates using join
for progress, msg in doc.join(interval=5, timeout=30):
print(progress, msg)
# Assert that progress is within 0 to 100
assert 0 <= progress <= 100, f"Invalid progress: {progress}"
# Assert that the message is not empty
assert msg, "Message should not be empty"
# Initiate asynchronous parsing for all documents
rag.async_parse_documents(ids)
print("Async bulk parsing initiated")
for c in doc.list_chunks(keywords="rag", offset=0, limit=12):
print(c)
assert c is not None, "Chunk is None"
assert "rag" in c['content_with_weight'].lower(), f"Keyword 'rag' not found in chunk content: {c.content}"
# Wait for all documents to finish parsing and check progress
for doc in docs:
for progress, msg in doc.join(interval=5, timeout=10):
print(f"{doc.name}: Progress: {progress}, Message: {msg}")
# Assert that progress is within the valid range
assert 0 <= progress <= 100, f"Invalid progress: {progress} for document {doc.name}"
def test_add_chunk_to_chunk_list(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
doc = rag.get_document(name='story.txt')
chunk = doc.add_chunk(content="assssdd")
assert chunk is not None, "Chunk is None"
assert isinstance(chunk, Chunk), "Chunk was not added to chunk list"
# Assert that the message is not empty
assert msg, f"Message should not be empty for document {doc.name}"
# If progress reaches 100%, assert that parsing is completed successfully
if progress == 100:
assert "completed" in msg.lower(), f"Document {doc.name} did not complete successfully"
def test_delete_chunk_of_chunk_list(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
doc = rag.get_document(name='story.txt')
chunk = doc.add_chunk(content="assssdd")
assert chunk is not None, "Chunk is None"
assert isinstance(chunk, Chunk), "Chunk was not added to chunk list"
doc = rag.get_document(name='story.txt')
chunk_count_before = doc.chunk_count
chunk.delete()
doc = rag.get_document(name='story.txt')
assert doc.chunk_count == chunk_count_before - 1, "Chunk was not deleted"
# Cancel parsing for all documents in bulk
cancel_result = rag.async_cancel_parse_documents(ids)
assert cancel_result is None or isinstance(cancel_result, type(None)), "Failed to cancel document parsing"
print("Async bulk parsing cancelled")
def test_parse_document_and_chunk_list(self):
rag = RAGFlow(API_KEY, HOST_ADDRESS)
ds = rag.create_dataset(name="God7")
name = 'story.txt'
path = 'test_data/story.txt'
# name = "Test Document rag.txt"
# blob = " Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps.Sample document content for rag test66. rag wonderful apple os documents apps.Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps. Sample document content for rag test66. rag wonderful apple os documents apps."
rag.create_document(ds, name=name, blob=open(path, "rb").read())
doc = rag.get_document(name=name)
doc.async_parse()
def test_update_chunk_content(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
doc = rag.get_document(name='story.txt')
chunk = doc.add_chunk(content="assssddd")
assert chunk is not None, "Chunk is None"
assert isinstance(chunk, Chunk), "Chunk was not added to chunk list"
chunk.content = "ragflow123"
res = chunk.save()
assert res is True, f"Failed to update chunk content, error: {res}"
# Wait for parsing to complete and get progress updates using join
for progress, msg in doc.join(interval=5, timeout=30):
print(progress, msg)
# Assert that progress is within 0 to 100
assert 0 <= progress <= 100, f"Invalid progress: {progress}"
# Assert that the message is not empty
assert msg, "Message should not be empty"
for c in doc.list_chunks(keywords="rag", offset=0, limit=12):
print(c)
assert c is not None, "Chunk is None"
assert "rag" in c['content_with_weight'].lower(), f"Keyword 'rag' not found in chunk content: {c.content}"
def test_update_chunk_available(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
doc = rag.get_document(name='story.txt')
chunk = doc.add_chunk(content="ragflow")
assert chunk is not None, "Chunk is None"
assert isinstance(chunk, Chunk), "Chunk was not added to chunk list"
chunk.available = 0
res = chunk.save()
assert res is True, f"Failed to update chunk status, error: {res}"
def test_add_chunk_to_chunk_list(self):
rag = RAGFlow(API_KEY, HOST_ADDRESS)
doc = rag.get_document(name='story.txt')
chunk = doc.add_chunk(content="assssdd")
assert chunk is not None, "Chunk is None"
assert isinstance(chunk, Chunk), "Chunk was not added to chunk list"
def test_delete_chunk_of_chunk_list(self):
rag = RAGFlow(API_KEY, HOST_ADDRESS)
doc = rag.get_document(name='story.txt')
chunk = doc.add_chunk(content="assssdd")
assert chunk is not None, "Chunk is None"
assert isinstance(chunk, Chunk), "Chunk was not added to chunk list"
doc = rag.get_document(name='story.txt')
chunk_count_before = doc.chunk_count
chunk.delete()
doc = rag.get_document(name='story.txt')
assert doc.chunk_count == chunk_count_before - 1, "Chunk was not deleted"
def test_update_chunk_content(self):
rag = RAGFlow(API_KEY, HOST_ADDRESS)
doc = rag.get_document(name='story.txt')
chunk = doc.add_chunk(content="assssddd")
assert chunk is not None, "Chunk is None"
assert isinstance(chunk, Chunk), "Chunk was not added to chunk list"
chunk.content = "ragflow123"
res = chunk.save()
assert res is True, f"Failed to update chunk content, error: {res}"
def test_update_chunk_available(self):
rag = RAGFlow(API_KEY, HOST_ADDRESS)
doc = rag.get_document(name='story.txt')
chunk = doc.add_chunk(content="ragflow")
assert chunk is not None, "Chunk is None"
assert isinstance(chunk, Chunk), "Chunk was not added to chunk list"
chunk.available = 0
res = chunk.save()
assert res is True, f"Failed to update chunk status, error: {res}"
def test_retrieval_chunks(self):
rag = RAGFlow(API_KEY, HOST_ADDRESS)
ds = rag.create_dataset(name="God8")
name = 'ragflow_test.txt'
path = 'test_data/ragflow_test.txt'
rag.create_document(ds, name=name, blob=open(path, "rb").read())
doc = rag.get_document(name=name)
doc.async_parse()
# Wait for parsing to complete and get progress updates using join
for progress, msg in doc.join(interval=5, timeout=30):
print(progress, msg)
assert 0 <= progress <= 100, f"Invalid progress: {progress}"
assert msg, "Message should not be empty"
for c in rag.retrieval(question="What's ragflow?",
datasets=[ds.id], documents=[doc],
offset=0, limit=6, similarity_threshold=0.1,
vector_similarity_weight=0.3,
top_k=1024
):
print(c)
assert c is not None, "Chunk is None"
assert "ragflow" in c.content.lower(), f"Keyword 'rag' not found in chunk content: {c.content}"
def test_retrieval_chunks(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
ds = rag.create_dataset(name="God8")
name = 'ragflow_test.txt'
path = 'test_data/ragflow_test.txt'
rag.create_document(ds, name=name, blob=open(path, "rb").read())
doc = rag.get_document(name=name)
doc.async_parse()
# Wait for parsing to complete and get progress updates using join
for progress, msg in doc.join(interval=5, timeout=30):
print(progress, msg)
assert 0 <= progress <= 100, f"Invalid progress: {progress}"
assert msg, "Message should not be empty"
for c in rag.retrieval(question="What's ragflow?",
datasets=[ds.id], documents=[doc],
offset=0, limit=6, similarity_threshold=0.1,
vector_similarity_weight=0.3,
top_k=1024
):
print(c)
assert c is not None, "Chunk is None"
assert "ragflow" in c.content.lower(), f"Keyword 'rag' not found in chunk content: {c.content}"

View File

@ -1,52 +1,110 @@
from ragflow import RAGFlow,Session
from common import API_KEY, HOST_ADDRESS
import time
HOST_ADDRESS = 'http://127.0.0.1:9380'
class TestSession:
def test_create_session(self):
rag = RAGFlow(API_KEY, HOST_ADDRESS)
kb = rag.create_dataset(name="test_create_session")
assistant = rag.create_chat(name="test_create_session", datasets=[kb])
session = assistant.create_session()
assert isinstance(session,Session), "Failed to create a session."
def test_create_chat_with_success(self):
rag = RAGFlow(API_KEY, HOST_ADDRESS)
kb = rag.create_dataset(name="test_create_chat")
assistant = rag.create_chat(name="test_create_chat", datasets=[kb])
session = assistant.create_session()
question = "What is AI"
for ans in session.ask(question, stream=True):
pass
assert not ans.content.startswith("**ERROR**"), "Please check this error."
def test_delete_sessions_with_success(self):
rag = RAGFlow(API_KEY, HOST_ADDRESS)
kb = rag.create_dataset(name="test_delete_session")
assistant = rag.create_chat(name="test_delete_session",datasets=[kb])
session=assistant.create_session()
res=assistant.delete_sessions(ids=[session.id])
assert res is None, "Failed to delete the dataset."
def test_update_session_with_success(self):
rag=RAGFlow(API_KEY,HOST_ADDRESS)
kb=rag.create_dataset(name="test_update_session")
assistant = rag.create_chat(name="test_update_session",datasets=[kb])
session=assistant.create_session(name="old session")
res=session.update({"name":"new session"})
assert res is None,"Failed to update the session"
def test_create_session_with_success(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
kb = rag.create_dataset(name="test_create_session")
displayed_name = "ragflow.txt"
with open("./ragflow.txt", "rb") as file:
blob = file.read()
document = {"displayed_name": displayed_name, "blob": blob}
documents = []
documents.append(document)
doc_ids = []
docs = kb.upload_documents(documents)
for doc in docs:
doc_ids.append(doc.id)
kb.async_parse_documents(doc_ids)
time.sleep(60)
assistant = rag.create_chat(name="test_create_session", datasets=[kb])
assistant.create_session()
def test_list_sessions_with_success(self):
rag=RAGFlow(API_KEY,HOST_ADDRESS)
kb=rag.create_dataset(name="test_list_session")
assistant=rag.create_chat(name="test_list_session",datasets=[kb])
assistant.create_session("test_1")
assistant.create_session("test_2")
sessions=assistant.list_sessions()
if isinstance(sessions,list):
for session in sessions:
assert isinstance(session,Session),"Non-Session elements exist in the list"
else :
assert False,"Failed to retrieve the session list."
def test_create_conversation_with_success(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
kb = rag.create_dataset(name="test_create_conversation")
displayed_name = "ragflow.txt"
with open("./ragflow.txt","rb") as file:
blob = file.read()
document = {"displayed_name":displayed_name,"blob":blob}
documents = []
documents.append(document)
doc_ids = []
docs= kb.upload_documents(documents)
for doc in docs:
doc_ids.append(doc.id)
kb.async_parse_documents(doc_ids)
time.sleep(60)
assistant = rag.create_chat(name="test_create_conversation", datasets=[kb])
session = assistant.create_session()
question = "What is AI"
for ans in session.ask(question, stream=True):
pass
assert not ans.content.startswith("**ERROR**"), "Please check this error."
def test_delete_sessions_with_success(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
kb = rag.create_dataset(name="test_delete_session")
displayed_name = "ragflow.txt"
with open("./ragflow.txt","rb") as file:
blob = file.read()
document = {"displayed_name":displayed_name,"blob":blob}
documents = []
documents.append(document)
doc_ids = []
docs= kb.upload_documents(documents)
for doc in docs:
doc_ids.append(doc.id)
kb.async_parse_documents(doc_ids)
time.sleep(60)
assistant = rag.create_chat(name="test_delete_session", datasets=[kb])
session = assistant.create_session()
assistant.delete_sessions(ids=[session.id])
def test_update_session_with_name(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
kb = rag.create_dataset(name="test_update_session")
displayed_name = "ragflow.txt"
with open("./ragflow.txt","rb") as file:
blob = file.read()
document = {"displayed_name":displayed_name,"blob":blob}
documents = []
documents.append(document)
doc_ids = []
docs= kb.upload_documents(documents)
for doc in docs:
doc_ids.append(doc.id)
kb.async_parse_documents(doc_ids)
time.sleep(60)
assistant = rag.create_chat(name="test_update_session", datasets=[kb])
session = assistant.create_session(name="old session")
session.update({"name": "new session"})
def test_list_sessions_with_success(get_api_key_fixture):
API_KEY = get_api_key_fixture
rag = RAGFlow(API_KEY, HOST_ADDRESS)
kb = rag.create_dataset(name="test_list_session")
displayed_name = "ragflow.txt"
with open("./ragflow.txt","rb") as file:
blob = file.read()
document = {"displayed_name":displayed_name,"blob":blob}
documents = []
documents.append(document)
doc_ids = []
docs= kb.upload_documents(documents)
for doc in docs:
doc_ids.append(doc.id)
kb.async_parse_documents(doc_ids)
time.sleep(60)
assistant = rag.create_chat(name="test_list_session", datasets=[kb])
assistant.create_session("test_1")
assistant.create_session("test_2")
assistant.list_sessions()

View File

@ -1,3 +0,0 @@
class TestSdk():
def test_version(self):
print("test_sdk")