Merge pull request #26 from valueonag/int

Int
This commit is contained in:
ValueOn AG 2025-09-04 23:56:52 +02:00 committed by GitHub
commit b16e1e7175
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 1297 additions and 291 deletions

View file

@ -792,7 +792,7 @@ class HandlingTasks:
# Add specific error details if available
if retry_context and hasattr(retry_context, 'previous_review_result') and retry_context.previous_review_result:
reason = retry_context.previous_review_result.get('reason', '')
reason = retry_context.previous_review_result.reason or ''
if reason and reason != "Task failed after all retries.":
error_message += f"{reason}\n\n"

View file

@ -11,6 +11,85 @@ logger = logging.getLogger(__name__)
# Prompt creation helpers extracted from managerChat.py
def _getPreviousRoundContext(service, workflow) -> str:
"""Get context from previous workflow rounds to help understand follow-up prompts"""
try:
if not workflow or not hasattr(workflow, 'messages') or not workflow.messages:
return ""
# Get current round number
current_round = getattr(workflow, 'currentRound', 0)
# If this is round 0 or 1, there's no previous context
if current_round <= 1:
return ""
# Find messages from previous rounds (rounds before current)
previous_messages = []
for message in workflow.messages:
message_round = getattr(message, 'roundNumber', 0)
if message_round > 0 and message_round < current_round:
previous_messages.append(message)
if not previous_messages:
return ""
# Sort by round number and sequence to get chronological order
previous_messages.sort(key=lambda msg: (getattr(msg, 'roundNumber', 0), getattr(msg, 'sequenceNr', 0)))
# Build context summary
context_parts = []
current_round_context = {}
for message in previous_messages:
round_num = getattr(message, 'roundNumber', 0)
if round_num not in current_round_context:
current_round_context[round_num] = {
'user_inputs': [],
'assistant_responses': [],
'task_outcomes': [],
'documents_processed': []
}
# Categorize messages
if message.role == 'user':
current_round_context[round_num]['user_inputs'].append(message.message)
elif message.role == 'assistant':
# Check if it's a task completion or error message
if 'task' in message.message.lower() and ('completed' in message.message.lower() or 'failed' in message.message.lower() or 'error' in message.message.lower()):
current_round_context[round_num]['task_outcomes'].append(message.message)
else:
current_round_context[round_num]['assistant_responses'].append(message.message)
# Check for document processing
if hasattr(message, 'documents') and message.documents:
doc_names = [doc.fileName for doc in message.documents if hasattr(doc, 'fileName')]
if doc_names:
current_round_context[round_num]['documents_processed'].extend(doc_names)
# Build context summary
for round_num in sorted(current_round_context.keys()):
round_data = current_round_context[round_num]
context_parts.append(f"ROUND {round_num} CONTEXT:")
if round_data['user_inputs']:
context_parts.append(f" User requests: {'; '.join(round_data['user_inputs'])}")
if round_data['task_outcomes']:
context_parts.append(f" Task outcomes: {'; '.join(round_data['task_outcomes'])}")
if round_data['documents_processed']:
context_parts.append(f" Documents processed: {', '.join(set(round_data['documents_processed']))}")
if context_parts:
return "\n".join(context_parts)
else:
return ""
except Exception as e:
logger.error(f"Error getting previous round context: {str(e)}")
return ""
def createTaskPlanningPrompt(context: TaskContext, service) -> str:
"""Create enhanced prompt for task planning with user-friendly message generation and language detection"""
# Get user language directly from service.user.language
@ -22,21 +101,29 @@ def createTaskPlanningPrompt(context: TaskContext, service) -> str:
# Extract available documents from context - use Pydantic model directly
available_documents = context.available_documents or "No documents available"
# Get previous workflow round context for better understanding of follow-up prompts
previous_round_context = _getPreviousRoundContext(service, context.workflow)
return f"""You are a task planning AI that analyzes user requests and creates structured task plans with user-friendly feedback messages.
USER REQUEST: {user_request}
AVAILABLE DOCUMENTS: {available_documents}
PREVIOUS WORKFLOW ROUNDS CONTEXT:
{previous_round_context if previous_round_context else "No previous workflow rounds - this is the first round."}
INSTRUCTIONS:
1. Analyze the user request and available documents
2. Group related topics and sequential steps into single, comprehensive tasks
3. Focus on business outcomes, not technical operations
4. Each task should produce meaningful, usable outputs
5. Ensure proper handover between tasks using result labels
6. Detect the language of the user request and include it in languageUserDetected
7. Generate user-friendly messages for each task in the user's request language
8. Return a JSON object with the exact structure shown below
1. Analyze the user request, available documents, and previous workflow rounds context
2. If the user request appears to be a follow-up (like "try again", "versuche es nochmals", "retry", etc.),
use the PREVIOUS WORKFLOW ROUNDS CONTEXT to understand what the user wants to retry or continue
3. Group related topics and sequential steps into single, comprehensive tasks
4. Focus on business outcomes, not technical operations
5. Each task should produce meaningful, usable outputs
6. Ensure proper handover between tasks using result labels
7. Detect the language of the user request and include it in languageUserDetected
8. Generate user-friendly messages for each task in the user's request language
9. Return a JSON object with the exact structure shown below
TASK GROUPING PRINCIPLES:
- COMBINE RELATED TOPICS: Group related subjects, sequential steps, or workflow-structured activities into single tasks
@ -67,6 +154,21 @@ TASK PLANNING PRINCIPLES:
- Group related activities to minimize task fragmentation
- Only create multiple tasks when dealing with truly different, independent objectives
FOLLOW-UP PROMPT HANDLING:
- If the user request is a follow-up (e.g., "try again", "versuche es nochmals", "retry", "continue", "proceed"),
analyze the PREVIOUS WORKFLOW ROUNDS CONTEXT to understand what failed or was incomplete
- Use the previous round's user requests and task outcomes to determine what the user wants to retry
- If previous rounds failed due to missing documents, and documents are now available,
create tasks that use the newly available documents to accomplish the original request
- Maintain the same business objective from previous rounds but adapt to current available resources
SPECIFIC SCENARIO HANDLING:
- If previous round failed with "documents missing" error and current round has documents available,
the user likely wants to retry the same operation with the newly provided documents
- Example: Previous round "speichere mir die 3 dokumente im sharepoint unter xxx" failed due to missing documents,
current round "versuche es nochmals" with documents should retry the SharePoint save operation
- Always check if the current request is a retry by looking for retry keywords and previous round context
REQUIRED JSON STRUCTURE:
{{
"overview": "Brief description of the overall plan",
@ -126,20 +228,81 @@ async def createActionDefinitionPrompt(context: TaskContext, service) -> str:
connRefs = service.getConnectionReferenceList()
# Debug logging for connections
logging.debug(f"Connection references retrieved: {connRefs}")
logging.debug(f"Connection references type: {type(connRefs)}")
logging.debug(f"Connection references length: {len(connRefs) if connRefs else 0}")
# Log document availability for debugging
logging.debug(f"Enhanced document context length: {len(available_documents_str)}")
available_methods_str = ''
# Create a structured JSON format for better AI parsing
# This replaces the old hard-to-read format with a clean JSON structure
# that the AI can easily parse and understand
available_methods_json = {}
for method, actions in method_actions.items():
available_methods_str += f"- {method}:\n"
available_methods_json[method] = {}
# Get the method instance for accessing docstrings
method_instance = service.methods.get(method, {}).get('instance') if hasattr(service, 'methods') else None
for action, sig in actions:
available_methods_str += f" - {action}: {sig}\n"
# Parse the signature to extract parameters
if '(' in sig and ')' in sig:
# Extract parameters from signature
params_start = sig.find('(')
params_end = sig.find(')')
params_str = sig[params_start+1:params_end]
# Parse parameters directly from the docstring - much simpler and more reliable!
parameters = []
# Get the actual function's docstring
if method_instance and hasattr(method_instance, action):
func = getattr(method_instance, action)
if hasattr(func, '__doc__') and func.__doc__:
docstring = func.__doc__
# Parse Parameters section from docstring
lines = docstring.split('\n')
in_parameters = False
for i, line in enumerate(lines):
original_line = line
line = line.strip()
if line == 'Parameters:':
in_parameters = True
continue
elif in_parameters and line and not original_line.startswith(' ') and not original_line.startswith('\t'):
# End of parameters section
break
elif in_parameters and (original_line.startswith(' ') or original_line.startswith('\t')):
# This is a parameter line - already stripped
# Format: "paramName (type): description"
if ':' in line:
# Find the colon that separates param from description
colon_pos = line.find(':')
param_part = line[:colon_pos].strip()
description = line[colon_pos+1:].strip()
# Parse parameter name and type
if '(' in param_part and ')' in param_part:
param_name = param_part.split('(')[0].strip()
type_part = param_part[param_part.find('(')+1:param_part.find(')')].strip()
# Check if optional
is_optional = 'optional' in type_part
param_type = type_part.replace('optional', '').strip().rstrip(',').strip()
parameters.append({
"name": param_name,
"type": param_type,
"description": description,
"required": not is_optional
})
available_methods_json[method][action] = {
"signature": sig,
"parameters": parameters,
"description": f"{method}.{action} action"
}
# Convert to a compact, AI-friendly format
available_methods_str = f"""
AVAILABLE ACTIONS (JSON format for better AI parsing):
{json.dumps(available_methods_json, indent=1, separators=(',', ':'))}
"""
retry_context = ""
if context.retry_count and context.retry_count > 0:
retry_context = f"""
@ -162,10 +325,10 @@ Previous action results that failed or were incomplete:
if context.previous_review_result:
retry_context += f"""
Previous review feedback:
- Status: {context.previous_review_result.get('status', 'unknown') or 'unknown'}
- Reason: {context.previous_review_result.get('reason', 'No reason provided') or 'No reason provided'}
- Quality Score: {context.previous_review_result.get('quality_score', 0) or 0}/10
- Unmet Criteria: {', '.join(context.previous_review_result.get('unmet_criteria', []) or [])}
- Status: {context.previous_review_result.status or 'unknown'}
- Reason: {context.previous_review_result.reason or 'No reason provided'}
- Quality Score: {context.previous_review_result.quality_score or 0}/10
- Unmet Criteria: {', '.join(context.previous_review_result.unmet_criteria or [])}
"""
# Use Pydantic model directly - no need for getattr

View file

@ -845,4 +845,40 @@ register_model_labels(
}
)
# ===== Centralized AI Call Response Models =====
class AiResult(BaseModel, ModelMixin):
"""Document result from centralized AI call"""
filename: str = Field(description="Name of the result document")
mimetype: str = Field(description="MIME type of the result document")
content: str = Field(description="Content of the result document")
# Register labels for AiResult
register_model_labels(
"AiResult",
{"en": "Result Document", "fr": "Document de résultat"},
{
"filename": {"en": "Filename", "fr": "Nom de fichier"},
"mimetype": {"en": "MIME Type", "fr": "Type MIME"},
"content": {"en": "Content", "fr": "Contenu"}
}
)
class CentralizedAiResponse(BaseModel, ModelMixin):
"""Standardized response format from centralized AI calls"""
aiResults: List[AiResult] = Field(default_factory=list, description="List of result documents")
success: bool = Field(description="Whether the AI call was successful")
error: Optional[str] = Field(None, description="Error message if the call failed")
# Register labels for CentralizedAiResponse
register_model_labels(
"CentralizedAiResponse",
{"en": "Centralized AI Response", "fr": "Réponse IA centralisée"},
{
"aiResults": {"en": "Result Documents", "fr": "Documents de résultat"},
"success": {"en": "Success", "fr": "Succès"},
"error": {"en": "Error", "fr": "Erreur"}
}
)

View file

@ -618,23 +618,24 @@ class MethodDocument(MethodBase):
# Create AI prompt for comprehensive report generation using user's prompt
combinedContent = "\n\n".join(allContent)
aiPrompt = f"""
{prompt}
Report Title: {title}
Additional Requirements:
1. Create a professional, well-formatted HTML report
2. Include an executive summary at the beginning
3. Organize information logically with clear sections
4. Highlight key findings and insights
5. Include relevant data, statistics, and conclusions
6. Use proper HTML formatting with headers, lists, and styling
7. Make it readable and professional
Document Content:
{combinedContent}
Generate a complete HTML report that addresses the user's specific requirements and integrates all the information into a cohesive, professional document.
{prompt}
Report Title: {title}
Additional Requirements:
1. Create a professional, well-formatted HTML report
2. Include an executive summary at the beginning
3. Organize information logically with clear sections
4. Highlight key findings and insights
5. Include relevant data, statistics, and conclusions
6. Use proper HTML formatting with headers, lists, and styling
7. Make it readable and professional
Document Content:
---START OF DOCUMENT CONTENT-----------------------------------------------
{combinedContent}
---END OF DOCUMENT CONTENT-----------------------------------------------
Generate a complete HTML report that addresses the user's specific requirements and integrates all the information into a cohesive, professional document.
"""
# Call AI to generate the report

View file

@ -379,10 +379,7 @@ class MethodOutlook(MethodBase):
connectionReference (str): Reference to the Microsoft connection
folder (str, optional): Email folder to read from (default: "Inbox")
limit (int, optional): Maximum number of emails to read (default: 10)
filter (str, optional): Filter criteria for emails. Supports:
- Email address (e.g., "user@domain.com") - filters by sender
- Search queries (e.g., "from:user@domain.com", "subject:meeting")
- Text content (e.g., "project update") - searches in subject
filter (str, optional): Filter criteria for emails. Supports: Email address (e.g., "user@domain.com") - filters by sender, Search queries (e.g., "from:user@domain.com", "subject:meeting"), Text content (e.g., "project update") - searches in subject
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
"""
try:

File diff suppressed because it is too large Load diff

View file

@ -120,7 +120,7 @@ async def login(
access_type="offline",
include_granted_scopes="true",
state=state_param,
prompt="select_account"
prompt="consent select_account"
)
logger.info(f"Generated Google OAuth URL using OAuth2Session: {auth_url}")
@ -166,6 +166,33 @@ async def auth_callback(code: str, state: str, request: Request) -> HTMLResponse
"token_type": token_data.get("token_type", "bearer"),
"expires_in": token_data.get("expires_in", 0)
}
# If Google did not return a refresh_token, try to reuse an existing one for this user/connection
if not token_response.get("refresh_token"):
try:
rootInterface = getRootInterface()
# Prefer connection flow reuse; fallback to user access token
if connection_id:
existing_tokens = rootInterface.db.getRecordset("tokens", recordFilter={
"connectionId": connection_id,
"authority": AuthAuthority.GOOGLE
})
if existing_tokens:
# Use most recent by createdAt
existing_tokens.sort(key=lambda x: x.get("createdAt", 0), reverse=True)
token_response["refresh_token"] = existing_tokens[0].get("tokenRefresh", "")
if not token_response.get("refresh_token") and user_id:
existing_access_tokens = rootInterface.db.getRecordset("tokens", recordFilter={
"userId": user_id,
"connectionId": None,
"authority": AuthAuthority.GOOGLE
})
if existing_access_tokens:
existing_access_tokens.sort(key=lambda x: x.get("createdAt", 0), reverse=True)
token_response["refresh_token"] = existing_access_tokens[0].get("tokenRefresh", "")
except Exception:
# Non-fatal; continue without refresh token
pass

View file

@ -98,8 +98,7 @@ class TokenManager:
"client_id": self.google_client_id,
"client_secret": self.google_client_secret,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"scope": "https://www.googleapis.com/auth/gmail.readonly https://www.googleapis.com/auth/userinfo.profile https://www.googleapis.com/auth/userinfo.email openid"
"refresh_token": refresh_token
}
# Make refresh request

311
test_graph_search.py Normal file
View file

@ -0,0 +1,311 @@
#!/usr/bin/env python3
"""
Simple test script for Microsoft Graph Search API
Tests folder search queries directly
"""
import requests
import json
import sys
import os
# Add the gateway modules to the path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
def test_graph_folders_direct(access_token):
"""Test direct Microsoft Graph API call to list folders"""
print("🔍 Testing direct Graph API folder listing...")
# Try to list folders from the main site - need to get site ID first
# Let's try to find the site by name first
url = "https://graph.microsoft.com/v1.0/sites/pcuster.sharepoint.com:/sites/SSSRESYNachfolge:/drive/root/children"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
items = data.get('value', [])
print(f"✅ SUCCESS - Found {len(items)} items in root")
folders = []
files = []
for item in items:
if 'folder' in item:
folders.append(item)
elif 'file' in item:
files.append(item)
print(f" 📁 Folders: {len(folders)}")
print(f" 📄 Files: {len(files)}")
if folders:
print("\n📁 FOLDERS found:")
for i, folder in enumerate(folders[:5], 1):
name = folder.get('name', 'No name')
web_url = folder.get('webUrl', 'No URL')
print(f" {i}. {name}")
print(f" URL: {web_url}")
print()
else:
print(f"❌ ERROR - Status {response.status_code}")
print(f"Error: {response.text[:200]}")
except Exception as e:
print(f"Exception: {str(e)}")
def test_graph_search(access_token, query_string):
"""Test a Microsoft Graph Search API query and show resulting paths"""
url = "https://graph.microsoft.com/v1.0/search/query"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
payload = {
"requests": [
{
"entityTypes": ["driveItem"],
"query": {
"queryString": query_string
},
"from": 0,
"size": 50
}
]
}
print(f"Testing: {query_string}")
print("-" * 50)
try:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
data = response.json()
# Extract useful info
if "value" in data and len(data["value"]) > 0:
hits = data["value"][0].get("hitsContainers", [])
if hits:
total = hits[0].get("total", 0)
results = hits[0].get("hits", [])
print(f"✅ SUCCESS - Found {total} results")
# First, let's see what types of results we're getting
print(f"📊 Analyzing {len(results)} results...")
# Count different types of results with better detection
file_count = 0
folder_count = 0
other_count = 0
# Debug: Let's see what the actual resource structure looks like
if results:
print("🔍 DEBUG: First result structure:")
first_result = results[0]
print(f" Keys: {list(first_result.keys())}")
if 'resource' in first_result:
resource = first_result['resource']
print(f" Resource keys: {list(resource.keys())}")
if 'folder' in resource:
print(f" Folder info: {resource['folder']}")
if 'file' in resource:
print(f" File info: {resource['file']}")
print()
for result in results:
resource = result.get('resource', {})
# Better detection logic
is_folder = False
is_file = False
# Check for explicit folder/file indicators
if 'folder' in resource:
is_folder = True
elif 'file' in resource:
is_file = True
else:
# Try to detect by URL pattern or other indicators
web_url = resource.get('webUrl', '')
name = resource.get('name', '')
# Check if URL ends with a file extension (likely a file)
if '.' in name and any(name.lower().endswith(ext) for ext in ['.pdf', '.docx', '.xlsx', '.pptx', '.txt', '.cs', '.py', '.js', '.html', '.css']):
is_file = True
# Check if URL has no file extension and looks like a folder path
elif '.' not in name and ('/' in web_url or '\\' in web_url):
is_folder = True
if is_folder:
folder_count += 1
elif is_file:
file_count += 1
else:
other_count += 1
print(f" 📄 Files: {file_count}")
print(f" 📁 Folders: {folder_count}")
print(f" ❓ Other: {other_count}")
print()
# Show sample results regardless of type
print(f"📋 Sample results (showing first 5):")
for i, result in enumerate(results[:5], 1):
resource = result.get('resource', {})
web_url = resource.get('webUrl', 'No URL')
name = resource.get('name', 'No name')
# Determine type using same logic as counting
is_folder = False
is_file = False
if 'folder' in resource:
is_folder = True
elif 'file' in resource:
is_file = True
else:
# Try to detect by URL pattern or other indicators
web_url = resource.get('webUrl', '')
name = resource.get('name', '')
# Check if URL ends with a file extension (likely a file)
if '.' in name and any(name.lower().endswith(ext) for ext in ['.pdf', '.docx', '.xlsx', '.pptx', '.txt', '.cs', '.py', '.js', '.html', '.css']):
is_file = True
# Check if URL has no file extension and looks like a folder path
elif '.' not in name and ('/' in web_url or '\\' in web_url):
is_folder = True
if is_folder:
item_type = "📁 FOLDER"
elif is_file:
file_info = resource.get('file', {})
mime_type = file_info.get('mimeType', 'Unknown type') if file_info else 'Detected by extension'
item_type = f"📄 FILE ({mime_type})"
else:
item_type = "❓ UNKNOWN"
# Extract path from webUrl
if '/sites/SSSRESYNachfolge/' in web_url:
path_part = web_url.split('/sites/SSSRESYNachfolge/')[-1]
path_with_backslashes = path_part.replace('/', '\\')
display_path = f"\\{path_with_backslashes}"
else:
display_path = web_url
print(f" {i}. {item_type} - {name}")
print(f" Path: {display_path}")
print(f" URL: {web_url}")
print()
if len(results) > 5:
print(f" ... and {len(results) - 5} more results")
# Now filter and show only FOLDER results if any exist
folder_results = []
for result in results:
resource = result.get('resource', {})
# Use the same detection logic as counting
is_folder = False
if 'folder' in resource:
is_folder = True
else:
# Try to detect by URL pattern or other indicators
web_url = resource.get('webUrl', '')
name = resource.get('name', '')
# Check if URL has no file extension and looks like a folder path
if '.' not in name and ('/' in web_url or '\\' in web_url):
is_folder = True
if is_folder:
folder_results.append(result)
if folder_results:
print(f"\n📁 FOLDER DETAILS ({len(folder_results)} folders found):")
for i, result in enumerate(folder_results, 1):
web_url = result.get('resource', {}).get('webUrl', 'No URL')
name = result.get('resource', {}).get('name', 'No name')
if '/sites/SSSRESYNachfolge/' in web_url:
path_part = web_url.split('/sites/SSSRESYNachfolge/')[-1]
path_with_backslashes = path_part.replace('/', '\\')
folder_path = f"\\{path_with_backslashes}"
else:
folder_path = web_url
print(f" {i}. 📁 {name}")
print(f" Path: {folder_path}")
print(f" URL: {web_url}")
print()
else:
print(f"\n❌ No folders found in results - all {total} results are files or other types")
else:
print("❌ SUCCESS but no hits containers found")
else:
print("❌ SUCCESS but no value array in response")
else:
print(f"❌ ERROR - Status {response.status_code}")
error_text = response.text[:200] + "..." if len(response.text) > 200 else response.text
print(f"Error: {error_text}")
except Exception as e:
print(f"Exception: {str(e)}")
def main():
"""Main test function"""
# Use the access token from the database
access_token = "eyJ0eXAiOiJKV1QiLCJub25jZSI6IkxwTjBjTXo2SGlja2ZPLUpnekRwTFE1QktfQmVOWHBwRWZ2UzZBMDh2REUiLCJhbGciOiJSUzI1NiIsIng1dCI6IkpZaEFjVFBNWl9MWDZEQmxPV1E3SG4wTmVYRSIsImtpZCI6IkpZaEFjVFBNWl9MWDZEQmxPV1E3SG4wTmVYRSJ9.eyJhdWQiOiIwMDAwMDAwMy0wMDAwLTAwMDAtYzAwMC0wMDAwMDAwMDAwMDAiLCJpc3MiOiJodHRwczovL3N0cy53aW5kb3dzLm5ldC82YTUxYWFlYi0yNDY3LTQxODYtOTUwNC0yYTA1YWVkYzU5MWYvIiwiaWF0IjoxNzU3MDEwNTc0LCJuYmYiOjE3NTcwMTA1NzQsImV4cCI6MTc1NzAxNTQ1MSwiYWNjdCI6MCwiYWNyIjoiMSIsImFjcnMiOlsicDEiXSwiYWlvIjoiQVpRQWEvOFpBQUFBcU0xNVFOMkhaQld5QXNsbStiT0QzbzRuU1RhUzg5bGdTV3ZUQVZvYVhqcUhlT1VaNFE1aEh0bE51WUdxelEvM0tDRnZlZktycU1HTUp2VmlVaWVibUhjbnBtL0FaRFA1Sk1YNnI4c1FCSVdLVTZPY29sUUNuOWpvcVZLb1VIOFl3WTJhM3picTlkeGdqVC94dU5NaCtKcXhMV1JMdEUrUjBZeGl0c3J0QXhpd0pRaGZmalIzK0xPSGtmVkxhOExaIiwiYW1yIjpbInB3ZCIsIm1mYSJdLCJhcHBfZGlzcGxheW5hbWUiOiJQb3dlck9uIEFwcCIsImFwcGlkIjoiYzdlNzExMmQtNjFkYy00ZjNhLThjZDMtMDhjYzRjZDc1MDRjIiwiYXBwaWRhY3IiOiIxIiwiZmFtaWx5X25hbWUiOiJNb3RzY2giLCJnaXZlbl9uYW1lIjoiUGF0cmljayIsImlkdHlwIjoidXNlciIsImlwYWRkciI6IjE3OC4xOTcuMjE4LjQ4IiwibmFtZSI6IlBhdHJpY2sgTW90c2NoIiwib2lkIjoiN2QwOGFhYjktYTE3MC00OTc1LTg4OTgtYmM3ZTBhOTU0ODhlIiwicGxhdGYiOiIzIiwicHVpZCI6IjEwMDM3RkZFOENERDZBODIiLCJyaCI6IjEuQVFzQTY2cFJhbWNraGtHVkJDb0ZydHhaSHdNQUFBQUFBQUFBd0FBQUFBQUFBQUNFQURBTEFBLiIsInNjcCI6IkZpbGVzLlJlYWRXcml0ZS5BbGwgTWFpbC5SZWFkV3JpdGUgTWFpbC5SZWFkV3JpdGUuU2hhcmVkIE1haWwuU2VuZCBvcGVuaWQgcHJvZmlsZSBTaXRlcy5SZWFkV3JpdGUuQWxsIFVzZXIuUmVhZCBlbWFpbCIsInNpZCI6IjAwNmY5Mjk5LTY3ZDUtYmU3Zi1kYWI4LWQwYTBlZTI1MTBkNiIsInNpZ25pbl9zdGF0ZSI6WyJrbXNpIl0sInN1YiI6IklnMGlwM3hhZGJMaXVLemJGZ3dWaE5JTV9Eekcwd3B4aUVGYjJKWXVjbjQiLCJ0ZW5hbnRfcmVnaW9uX3Njb3BlIjoiRVUiLCJ0aWQiOiI2YTUxYWFlYi0yNDY3LTQxODYtOTUwNC0yYTA1YWVkYzU5MWYiLCJ1bmlxdWVfbmFtZSI6InAubW90c2NoQHZhbHVlb24uY2giLCJ1cG4iOiJwLm1vdHNjaEB2YWx1ZW9uLmNoIiwidXRpIjoieTh5ZGhEcWRDMG1nVTBpLV94azFBUSIsInZlciI6IjEuMCIsIndpZHMiOlsiOWI4OTVkOTItMmNkMy00NGM3LTlkMDItYTZhYzJkNWVhNWMzIiwiY2YxYzM4ZTUtMzYyMS00MDA0LWE3Y2ItODc5NjI0ZGNlZDdjIiwiMTU4YzA0N2EtYzkwNy00NTU2LWI3ZWYtNDQ2NTUxYTZiNWY3IiwiODkyYzU4NDItYTlhNi00NjNhLTgwNDEtNzJhYTA4Y2EzY2Y2IiwiOWYwNjIwNGQtNzNjMS00ZDRjLTg4MGEtNmVkYjkwNjA2ZmQ4IiwiYjc5ZmJmNGQtM2VmOS00Njg5LTgxNDMtNzZiMTk0ZTg1NTA5Il0sInhtc19mdGQiOiIwcEZ4RVctQnl6Y3M5UW5HdXNDbU1Ka1V4MHNQWlEzOUkzWUwxRGZJdnpzQmMzZGxaR1Z1WXkxa2MyMXoiLCJ4bXNfaWRyZWwiOiIxIDI0IiwieG1zX3N0Ijp7InN1YiI6IlIydkQwRzFtbWFZUkM3SllXY0lTWlcyS0RQZ05CakJMRmw2ZUxBQl9QVU0ifSwieG1zX3RjZHQiOjE0MTgyMTQ1MDEsInhtc190ZGJyIjoiRVUifQ.JYEWH2YxBrgWSn-9WN3BixJ91q19RGd0U7HgiiLpmwKUicft8zrovO8wKVU5rkly6CBcEO_eGAvyqQHSjFLHXKGDrutrFVdLTLB0vUu3J1Lkw31CiJF_y6Y3r2VytOF8evcYwh_Ye-5eoAxIr5avR8j_T51RPkLG53QSJ-tA5utDgHGWa65T5-mmeZxI-ThYxfyLori1uS8TSchJBdwrWwv8pkklHn6lZrFfgiuviRjLrOOLVUL_fzIod_eOKjo31YHhUzfm-QD3vvQkqnWNcdQ4D0UaTxKW291fHFafQZ9SkH9m0BD9nn56QBqijUBhvA8qMZC_cObb3DpR0GR_xA"
print("=" * 60)
print("Microsoft Graph API Test Suite")
print("=" * 60)
# First test: Direct folder listing (should work better than search)
print("\nTEST 0: Direct Graph API folder listing")
test_graph_folders_direct(access_token)
# Test different query types to find both files and folders
test_queries = [
# Test 1: Test with Venus folder (empty folder created for testing)
"Venus",
# Test 2: Folder-specific searches for Venus
"kind:folder AND Venus",
# Test 3: Original specific query (found 8 results - all files)
"Druckersteuerung AND Eskalation AND Logobject",
# Test 4: Broader folder-focused queries
"Druckersteuerung",
"Eskalation",
"Logobject",
# Test 5: Folder-specific searches
"kind:folder AND Druckersteuerung",
"kind:folder AND Eskalation",
# Test 6: General folder search to see what folders exist
"kind:folder",
]
for i, query in enumerate(test_queries, 1):
print(f"\nTEST {i}: {query}")
test_graph_search(access_token, query)
print()
if __name__ == "__main__":
main()