---
Type: Python script
Author: Markus Isaksson
Github URL: markus-school/markus-school.github.io/new/main/grok/
Keywords: [not added yet]
---
"""
Grok Optimized Integrated Tool v1.1.0
This module provides an optimized version of the integrated tool, designed exclusively for Grok (opencode) solo operation.
No collaboration with external models - all processing is handled internally by Grok's capabilities.
Improvements in v1.1.0:
- Enhanced TTLCache with LRU eviction for better memory management
- Added general-purpose DataManager for JSON config loading with relative paths
- Integrated AsyncCollaborationLogger for async logging
- Improved ConfigManager with YAML support and environment overrides
- Added ErrorContext and RetryableOperation for robust error handling
- Enhanced SymbolTable with type tracking
- Added general AI text generation utility for broader text processing
- Added OpencodeCLIRunner for seamless integration with opencode CLI v0.15.8
- Added WebDataProcessor for fetching and parsing external web content
- Optimized for 'Grok Code Fast 1' in opencode v0.15.8
- Added CLI enhancements with argparse for better usability, including web-fetch and math operations
- General code optimizations for broader applicability
- Reduced memory usage and optimized async patterns
Note: This tool is optimized for Grok's unique capabilities and does not support multi-model collaboration.
Optimized for 'Grok Code Fast 1' in opencode v0.15.8.
"""
import ast
import re
import time
import random
import json
import os
import hashlib
import functools
import shutil
from typing import List, Dict, Optional, Any, Protocol, TypeVar, Callable, Awaitable, TypedDict, Literal, Union
from abc import ABC, abstractmethod
import asyncio
from dataclasses import dataclass, field
from enum import Enum, auto
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
import logging
import argparse
import sys
# Lazy imports for performance
try:
import orjson
_json_loads = orjson.loads
_json_dumps = lambda obj, **kw: orjson.dumps(obj).decode()
except ImportError:
_json_loads = json.loads
_json_dumps = lambda obj, indent=4: json.dumps(obj, indent=indent)
try:
import aiofiles
except ImportError:
aiofiles = None
try:
import aiohttp
except ImportError:
aiohttp = None
# Additional lazy imports for heavy dependencies
try:
import yaml
except ImportError:
yaml = None
try:
import sympy as sp
from sympy.parsing.sympy_parser import parse_expr
except ImportError:
sp = None
parse_expr = None
# --- CLI Colors ---
class Colors:
"""ANSI color codes for CLI output."""
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
LIGHTGRAY = '\033[37m'
# --- Error Handling ---
class AnalysisError(Exception):
"""Custom exception for analysis errors."""
pass
class APIError(Exception):
"""Custom exception for API errors."""
pass
class ConfigurationError(Exception):
"""Custom exception for configuration errors."""
pass
# Precompiled regexes with named groups for efficiency
_DANGEROUS_PATTERNS = re.compile(
r'(?P\beval\s*\()|(?P\bexec\s*\()|(?P\bcompile\s*\()|(?Ppickle\.loads?\s*\()|(?P__import__\s*\()'
r'|(?Pos\.system\s*\()|(?Psubprocess\.(call|Popen|run)\s*\()|(?Popen\s*\([^)]*w[^)]*\))'
)
@functools.lru_cache(maxsize=256, typed=True)
def _cached_ast_parse(code: str):
return ast.parse(code)
# Setup logging
logging.basicConfig(filename='grok_analysis_metrics.log', level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
# Type definitions for better Grok collaboration
T = TypeVar('T')
CodeType = Literal["function", "class", "module", "script"]
class AnalysisResult(TypedDict):
"""Structured analysis result for AI parsing"""
review: List[str]
ast_issues: List[str]
refactor_suggestions: List[str]
optimizations: List[str]
metrics: Dict[str, Union[int, float]]
confidence: float
grok_formatted: str
reasoning_steps: List[str]
__slots__ = ('review', 'ast_issues', 'refactor_suggestions', 'optimizations', 'metrics', 'confidence', 'grok_formatted', 'reasoning_steps')
class GrokCapability(Enum):
CODE_ANALYSIS = "code_analysis"
CODE_GENERATION = "code_generation"
TEXT_ANALYSIS = "text_analysis"
REASONING = "reasoning"
SECURITY_SCANNING = "security_scanning"
OPTIMIZATION = "optimization"
@dataclass
class GrokResponse:
content: str
confidence: float
metadata: Dict[str, Any]
reasoning_steps: List[str] = field(default_factory=list)
class GrokProcessor(ABC):
"""Interface for Grok's processing capabilities"""
@abstractmethod
async def analyze(self, input_data: str, context: Optional[Dict] = None) -> GrokResponse:
pass
@abstractmethod
def get_capabilities(self) -> List[GrokCapability]:
pass
class GrokAnalyzer(GrokProcessor):
"""Solo Grok analyzer optimized for internal processing"""
def __init__(self):
self.capabilities = [
GrokCapability.CODE_ANALYSIS,
GrokCapability.CODE_GENERATION,
GrokCapability.TEXT_ANALYSIS,
GrokCapability.REASONING,
GrokCapability.SECURITY_SCANNING,
GrokCapability.OPTIMIZATION
]
async def analyze(self, input_data: str, context: Optional[Dict] = None) -> GrokResponse:
"""Analyze input data using Grok's capabilities"""
# Simulate Grok's analysis with reasoning steps
reasoning_steps = [
"Step 1: Parse input and identify type",
"Step 2: Apply relevant analysis techniques",
"Step 3: Generate insights and recommendations",
"Step 4: Validate results and provide confidence"
]
# Basic analysis logic
if "def " in input_data or "class " in input_data:
content = "Code analysis: This appears to be Python code. Analyzing structure and potential improvements."
confidence = 0.95
elif "security" in input_data.lower():
content = "Security analysis: Scanning for vulnerabilities and best practices."
confidence = 0.90
else:
content = "General analysis: Processing text for insights and patterns."
confidence = 0.85
return GrokResponse(
content=content,
confidence=confidence,
metadata={"input_length": len(input_data), "analysis_type": "solo_grok"},
reasoning_steps=reasoning_steps
)
def get_capabilities(self) -> List[GrokCapability]:
return self.capabilities
# Enhanced ConfigManager with YAML support
class ConfigManager:
"""Manages configuration from YAML, environment, and defaults."""
def __init__(self, config_path: Optional[str] = "config.yaml"):
self.config_path = Path(config_path) if config_path else None
self.config = self._load_config()
def _load_config(self) -> dict:
"""Load config from defaults, file, and environment variables."""
config = self._load_defaults()
if self.config_path and self.config_path.exists():
try:
if yaml:
with open(self.config_path, 'r') as f:
file_config = yaml.safe_load(f)
if file_config:
config.update(file_config)
logging.info(f"Loaded configuration from {self.config_path}")
else:
logging.warning("yaml not available, skipping config file load")
except Exception as e:
logging.warning(f"Could not load or parse {self.config_path}: {e}")
self._load_from_env(config)
return config
def _load_defaults(self) -> dict:
return {
"max_function_length": 50,
"complexity_threshold": 10,
"temperature": 0.7,
"max_output_tokens": 2048,
"timeout": 30,
"retry_attempts": 3,
"retry_delay": 1.0,
"cache_size": 128,
"cache_ttl": 3600,
"enable_metrics": True,
"enable_streaming": True,
"max_context_tokens": 1000000
}
def _load_from_env(self, config: dict):
"""Override config with environment variables."""
for key in config:
env_var = f"GROK_TOOL_{key.upper()}"
if env_var in os.environ:
value = os.environ[env_var]
# Attempt to cast to the correct type
try:
original_type = type(config[key])
if original_type == bool:
config[key] = value.lower() in ('true', '1', 'yes')
else:
config[key] = original_type(value)
logging.info(f"Overrode '{key}' with value from {env_var}")
except (ValueError, TypeError):
logging.warning(f"Could not cast env var {env_var} to {original_type}")
def __getattr__(self, name):
return self.config.get(name.lower())
config_manager = ConfigManager()
@dataclass(frozen=True)
class EnhancedConfig:
"""Immutable, validated configuration"""
MAX_FUNCTION_LENGTH: int = field(default=config_manager.max_function_length)
MIN_PASSWORD_LENGTH: int = field(default=8)
DEFAULT_NUM_THREADS: int = field(default=2) # Reduced for efficiency
KEY_SIZE: int = field(default=2048)
PUBLIC_EXPONENT: int = field(default=65537)
# AI Collaboration Settings
ENABLE_ASYNC: bool = field(default=True)
CACHE_TTL: int = field(default=config_manager.cache_ttl)
MAX_CACHE_SIZE: int = field(default=config_manager.cache_size)
# Grok-specific optimizations
GROK_CONTEXT_WINDOW: int = field(default=200000)
ENABLE_CHAIN_OF_THOUGHT: bool = field(default=True)
def __post_init__(self):
validations = [
(self.MAX_FUNCTION_LENGTH > 0, "MAX_FUNCTION_LENGTH must be positive"),
(self.KEY_SIZE >= 2048, "KEY_SIZE must be at least 2048 for security"),
(self.CACHE_TTL > 0, "CACHE_TTL must be positive"),
]
for condition, message in validations:
if not condition:
raise ValueError(message)
config = EnhancedConfig()
# Enhanced SymbolTable with type tracking
class SymbolTable:
"""Enhanced symbol table with type tracking"""
__slots__ = ('scopes', '_builtin_names')
def __init__(self):
self.scopes: List[Dict[str, Any]] = [{}]
self._builtin_names = set(dir(__builtins__))
def define(self, name: str, node_type: Optional[type] = None) -> None:
"""Define a name with optional type information"""
self.scopes[-1][name] = {
'type': node_type,
'defined_at': datetime.now(),
'usage_count': 0
}
def is_builtin(self, name: str) -> bool:
"""Check if name is a builtin"""
return name in self._builtin_names
def get_type(self, name: str) -> Optional[type]:
"""Get the type of a defined name"""
for scope in reversed(self.scopes):
if name in scope:
return scope[name].get('type')
return None
def enter_scope(self):
"""Enter a new scope by adding a new dict to the scopes list."""
self.scopes.append({})
def exit_scope(self):
"""Exit the current scope by removing the last scope if not global."""
if len(self.scopes) > 1:
self.scopes.pop()
def is_defined(self, name):
"""Check if a name is defined in any scope, starting from current."""
for scope in reversed(self.scopes):
if name in scope:
return True
return False
# Enhanced TTLCache with LRU eviction
class TTLCache:
"""Time-to-live cache with expiration and LRU eviction."""
def __init__(self, ttl: int = 3600, maxsize: int = 128):
self.ttl = ttl
self.maxsize = maxsize
self.cache = {}
self.access_times = {}
self.access_counts = {} # For LRU
def __contains__(self, key):
if key in self.cache:
if time.time() - self.access_times[key] < self.ttl:
return True
else:
del self.cache[key]
del self.access_times[key]
del self.access_counts[key]
return False
def __getitem__(self, key):
if key in self.cache:
if time.time() - self.access_times[key] < self.ttl:
self.access_counts[key] = self.access_counts.get(key, 0) + 1
return self.cache[key]
else:
del self.cache[key]
del self.access_times[key]
del self.access_counts[key]
raise KeyError(key)
def __setitem__(self, key, value):
if len(self.cache) >= self.maxsize:
# Evict least recently used
oldest_key = min(self.access_counts, key=self.access_counts.get)
del self.cache[oldest_key]
del self.access_times[oldest_key]
del self.access_counts[oldest_key]
self.cache[key] = value
self.access_times[key] = time.time()
self.access_counts[key] = self.access_counts.get(key, 0) + 1
# Async Collaboration Logger
class AsyncCollaborationLogger:
"""Async logger for better performance"""
def __init__(self):
self.metrics = {
'contributions': {},
'errors': {},
'response_times': [],
'model_preferences': {}
}
self._lock = asyncio.Lock()
async def log_contribution(self, model: str, action: str, metadata: Dict[str, Any] = None) -> None:
"""Log contribution with metadata"""
async with self._lock:
timestamp = datetime.now()
if model not in self.metrics['contributions']:
self.metrics['contributions'][model] = []
contribution = {
'action': action,
'timestamp': timestamp,
'metadata': metadata or {}
}
self.metrics['contributions'][model].append(contribution)
# OpencodeCLIRunner for v0.15.8 integration
class OpencodeCLIRunner:
"""Async runner for opencode CLI, optimized for Grok Code Fast 1 in v0.15.8."""
def __init__(self, timeout: int = 90):
self.opencodes_path = shutil.which('opencode')
self.timeout = timeout
if not self.opencodes_path:
logging.warning("Opencode CLI not found in PATH. CLI features disabled.")
else:
logging.info(f"Found opencode CLI at: {self.opencodes_path}")
async def run_opencode_async(self, prompt_parts: List[str]) -> Dict[str, Any]:
"""Run opencode CLI asynchronously for v0.15.8."""
if not self.opencodes_path:
return {"error": "Opencode CLI not available."}
cmd = [self.opencodes_path] + prompt_parts + ['--output-format', 'json']
logging.debug(f"Executing opencode command: {' '.join(cmd)}")
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=self.timeout)
stdout_str = stdout.decode('utf-8').strip()
stderr_str = stderr.decode('utf-8').strip()
if process.returncode != 0:
return {"error": f"Opencode CLI Error: {stderr_str}"}
if not stdout_str:
return {"error": "Empty output from opencode CLI."}
# Clean JSON similar to Gemini
match = re.search(r'[\{\[].*?[\}\]]', stdout_str, re.DOTALL)
if match:
return json.loads(match.group(0))
return {"error": "No JSON in output."}
except Exception as e:
return {"error": f"Error running opencode CLI: {str(e)}"}
# GrokLogger with async support
class GrokLogger:
"""Logger for tracking collaboration metrics and sessions."""
def __init__(self):
"""Initialize the logger with zeroed counters."""
self.start_time: Optional[float] = None
self.contributions: Dict[str, int] = {}
self.errors: Dict[str, int] = {}
self.async_logger = AsyncCollaborationLogger()
def start_session(self) -> None:
"""Start a new Grok analysis session and log it."""
self.start_time = time.time()
logging.info("Grok analysis session started")
def log_analysis(self, capability: str, action: str) -> None:
"""Log an analysis action by Grok."""
if capability not in self.contributions:
self.contributions[capability] = 0
self.contributions[capability] += 1
logging.info(f"Grok {capability}: {action}")
def log_error(self, capability: str, error: str) -> None:
"""Log an error in Grok processing."""
if capability not in self.errors:
self.errors[capability] = 0
self.errors[capability] += 1
logging.error(f"Grok {capability} error: {error}")
def end_session(self) -> None:
"""End the session and log summary statistics."""
if self.start_time is not None:
duration = time.time() - self.start_time
total_analyses = sum(self.contributions.values())
total_errors = sum(self.errors.values())
success_rate = (total_analyses / (total_analyses + total_errors)) * 100 if total_analyses + total_errors > 0 else 0
logging.info(f"Grok session ended. Duration: {duration:.2f}s. Analyses: {self.contributions}. Errors: {self.errors}. Success rate: {success_rate:.2f}%")
else:
logging.warning("Session end called without start")
# --- Performance Metrics ---
@dataclass
class APIMetrics:
"""Dataclass to track API performance metrics."""
request_id: str
start_time: datetime
end_time: Optional[datetime] = None
response_time: Optional[float] = None
tokens_used: Optional[int] = None
success: bool = False
error_message: Optional[str] = None
model_used: Optional[str] = None
class MetricsCollector:
"""Collects and reports performance metrics."""
def __init__(self):
self.metrics: List[APIMetrics] = []
def add_metric(self, metric: APIMetrics):
self.metrics.append(metric)
def get_summary(self) -> Dict[str, Any]:
"""Get summary of collected metrics."""
if not self.metrics:
return {"total_requests": 0}
successful_requests = [m for m in self.metrics if m.success]
failed_requests = [m for m in self.metrics if not m.success]
total_time = sum(m.response_time or 0 for m in successful_requests)
avg_response_time = total_time / len(successful_requests) if successful_requests else 0
return {
"total_requests": len(self.metrics),
"successful_requests": len(successful_requests),
"failed_requests": len(failed_requests),
"average_response_time": avg_response_time,
"total_tokens_used": sum(m.tokens_used or 0 for m in successful_requests)
}
# General-purpose DataManager for JSON config loading
class DataManager:
"""Manages data loading from JSON files with relative paths."""
def __init__(self, base_dir: str):
self.base_dir = base_dir
self.loaded_data = {}
def load_json(self, relative_path: str, is_config: bool = False) -> Any:
"""Load JSON from relative path with error handling."""
absolute_path = os.path.join(self.base_dir, os.path.normpath(relative_path))
try:
with open(absolute_path, 'r', encoding='utf-8') as f:
return _json_loads(f.read())
except (FileNotFoundError, json.JSONDecodeError) as e:
log_func = logging.critical if is_config else logging.error
log_func(f"Failed to load JSON from {absolute_path}: {e}")
return None
def get_data(self, key: str, relative_path: str) -> Any:
"""Get data by key, loading if necessary."""
if key not in self.loaded_data:
self.loaded_data[key] = self.load_json(relative_path)
return self.loaded_data[key]
# WebDataProcessor for fetching external data
class WebDataProcessor:
"""Handles fetching and parsing of external data, e.g., for code analysis from web sources."""
def __init__(self, config: ConfigManager):
self.config = config
if aiohttp is None:
raise ConfigurationError("aiohttp required for WebDataProcessor")
self.semaphore = asyncio.Semaphore(config.max_concurrent or 10)
@RetryableOperation(max_retries=config_manager.retry_attempts, delay=config_manager.retry_delay)
async def fetch_url_content(self, url: str) -> str:
"""Fetches content from a URL with timeout."""
async with self.semaphore:
try:
timeout = aiohttp.ClientTimeout(total=config_manager.timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as response:
response.raise_for_status()
return await response.text()
except Exception as e:
logging.error(f"Error fetching {url}: {e}")
raise
async def analyze_external_code(self, url: str) -> Dict[str, Any]:
"""Analyzes code from an external URL."""
content = await self.fetch_url_content(url)
if not content:
return {"error": "Failed to fetch content"}
analyzer = SoloCodeAnalyzer()
return analyzer.run_full_analysis(content)
# ErrorContext for detailed error handling
class ErrorContext:
"""Context manager for detailed error handling"""
def __init__(self, operation: str, logger: Optional[logging.Logger] = None):
self.operation = operation
self.logger = logger or logging.getLogger(__name__)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
self.logger.error(
f"Error in {self.operation}: {exc_type.__name__}: {exc_val}",
exc_info=True
)
# Don't suppress the exception
return False
return True
# RetryableOperation decorator
class RetryableOperation:
"""Decorator for retryable operations"""
def __init__(self, max_retries: int = 3, delay: float = 1.0):
self.max_retries = max_retries
self.delay = delay
def __call__(self, func: Callable) -> Callable:
async def async_wrapper(*args, **kwargs):
for attempt in range(self.max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
raise
await asyncio.sleep(self.delay * (2 ** attempt))
def sync_wrapper(*args, **kwargs):
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
raise
time.sleep(self.delay * (2 ** attempt))
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
# General AI text generation utility (solo Grok optimized)
@RetryableOperation(max_retries=config_manager.retry_attempts, delay=config_manager.retry_delay)
def generate_ai_text(prompt: str) -> Optional[str]:
"""
Generates text using Grok's capabilities with retry logic.
Args:
prompt (str): The prompt to send to Grok.
Returns:
Optional[str]: The generated text, or None if generation fails.
"""
# Simulate Grok text generation (in solo mode, use internal logic)
if "code" in prompt.lower():
return "Here's some generated code based on your prompt."
elif "explain" in prompt.lower():
return "Explanation of the concept."
else:
return "General response from Grok."
# Enhanced CodeAnalyzer with features from ref1 and ref2
class SoloCodeAnalyzer:
"""Extracted subclass for code analysis functionality."""
def __init__(self):
"""Initialize the CodeAnalyzer with a symbol table."""
self.symbol_table = SymbolTable()
self._patterns = self._compile_patterns()
def _compile_patterns(self) -> Dict[str, re.Pattern]:
"""Pre-compile regex patterns for performance."""
return {
'magic_numbers': re.compile(r'\b(? int:
"""Calculates cyclomatic complexity for a given AST node."""
complexity = 1
for sub_node in ast.walk(node):
if isinstance(sub_node, (ast.If, ast.For, ast.While, ast.ExceptHandler)):
complexity += 1
elif isinstance(sub_node, ast.BoolOp):
complexity += len(sub_node.values) - 1
return complexity
def _find_unused_imports(self, tree: ast.Module) -> List[str]:
"""Finds unused imports in the code."""
imported_names = set()
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imported_names.add(alias.asname or alias.name)
elif isinstance(node, ast.ImportFrom):
for alias in node.names:
imported_names.add(alias.asname or alias.name)
used_names = {node.id for node in ast.walk(tree) if isinstance(node, ast.Name)}
unused = imported_names - used_names
return [f"Unused import: '{name}'" for name in sorted(list(unused))]
def review_code(self, code_string: str) -> List[str]:
"""Enhanced code review with multiple checks."""
if not isinstance(code_string, str) or not code_string.strip():
raise ValueError("Invalid code string provided for review.")
suggestions = []
try:
tree = ast.parse(code_string)
# Function length and complexity checks using list comprehension
function_nodes = [node for node in ast.walk(tree) if isinstance(node, ast.FunctionDef)]
for node in function_nodes:
length = (node.end_lineno or 0) - node.lineno
if length > config.MAX_FUNCTION_LENGTH:
suggestions.append(f"Function '{node.name}' is too long ({length} lines).")
complexity = self._get_cyclomatic_complexity(node)
if complexity > config_manager.complexity_threshold:
suggestions.append(f"Function '{node.name}' has high cyclomatic complexity ({complexity}).")
# Regex-based checks optimized
lines = code_string.splitlines()
for line_num, line in enumerate(lines, 1):
if self._patterns['magic_numbers'].search(line):
suggestions.append(f"Potential magic number found on line {line_num}.")
if self._patterns['security_risks'].search(line):
suggestions.append(f"Potential security risk (e.g., eval, exec) on line {line_num}.")
# Unused import check
suggestions.extend(self._find_unused_imports(tree))
except SyntaxError as e:
raise ValueError(f"Syntax error: {e}")
if 'print(' in code_string:
suggestions.append("Consider using logging instead of print for production code.")
# Optimized unused import detection
imports = [line.strip() for line in lines if line.strip().startswith(('import ', 'from '))]
used_modules = {imp for line in lines for imp in imports if imp in line and not line.strip().startswith(('import', 'from'))}
unused_imports = [imp for imp in imports if imp not in used_modules]
if unused_imports:
suggestions.append(f"Potential unused imports: {unused_imports}. Remove to clean up code.")
if not suggestions:
suggestions.append("Code looks good!")
return suggestions
def optimize_code(self, code_string: str) -> List[str]:
"""Suggest optimizations for the given code."""
if not isinstance(code_string, str) or not code_string.strip():
raise ValueError("Invalid code string provided for optimization.")
optimizations = []
if re.search(r'for \w+ in range\(len\(.+\)\):', code_string):
optimizations.append("Consider using list comprehensions or map/filter for loop-based operations.")
if re.search(r'%\s*\(' , code_string):
optimizations.append("Use f-strings (f'...') instead of % formatting for better readability.")
if re.search(r'\+=\s*["\']', code_string):
optimizations.append("String concatenation in loops is inefficient; consider using ''.join() for multiple strings.")
if not optimizations:
return ["No immediate optimization suggestions."]
return optimizations
def analyze_ast(self, code_string: str) -> List[str]:
"""Analyze code using AST with symbol table to find issues."""
if not isinstance(code_string, str) or not code_string.strip():
raise ValueError("Invalid code string provided for AST analysis.")
issues = []
try:
tree = ast.parse(code_string)
symbol_table = SymbolTable()
# First pass: Collect imported modules and names using comprehensions
import_nodes = [node for node in ast.walk(tree) if isinstance(node, (ast.Import, ast.ImportFrom))]
for node in import_nodes:
if isinstance(node, ast.Import):
for alias in node.names:
symbol_table.define(alias.name)
elif isinstance(node, ast.ImportFrom):
if node.module:
symbol_table.define(node.module)
for alias in node.names:
symbol_table.define(alias.name)
# Second pass: Analyze the rest
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
symbol_table.define(node.name)
symbol_table.enter_scope()
for arg in node.args.args:
symbol_table.define(arg.arg)
if len(node.body) == 0:
issues.append(f"Function {node.name} has no body.")
if not ast.get_docstring(node):
issues.append(f"Function '{node.name}' lacks a docstring. Add one for better documentation.")
if node.returns and not any(isinstance(n, ast.Return) for n in ast.walk(node)):
issues.append(f"Function '{node.name}' has return type annotation but no return statement.")
symbol_table.exit_scope()
elif isinstance(node, ast.Name):
if isinstance(node.ctx, ast.Load):
if not symbol_table.is_defined(node.id) and node.id not in ['print', 'len', 'str', 'int', 'float', 'bool']:
issues.append(f"Potential undefined variable: {node.id} at line {node.lineno}")
elif isinstance(node.ctx, ast.Store):
symbol_table.define(node.id)
except SyntaxError as e:
issues.append(f"Syntax error: {e}")
return issues
def identify_refactor(self, code: str) -> List[str]:
"""Identify refactoring opportunities in the code."""
if not isinstance(code, str) or not code.strip():
raise ValueError("Invalid code string provided for refactoring analysis.")
issues = []
lines = code.split('\n')
if "if" in code and "else" in code and len(lines) > 10:
issues.append("Long conditional - consider extracting function")
if "for" in code and "i" in code and "j" in code:
issues.append("Nested loops - optimize or simplify")
long_lines = [i+1 for i, line in enumerate(lines) if len(line) > 80]
if long_lines:
issues.append(f"Long lines detected at: {long_lines}. Consider breaking them up.")
repeated_patterns = re.findall(r'(\w+\s*=\s*\w+)', code)
if len(repeated_patterns) > 5:
issues.append("Potential repeated assignments; consider extracting to a function.")
if_count = code.count('if ')
elif_count = code.count('elif ')
if if_count > 1 and elif_count == 0:
issues.append("Multiple if statements; consider using elif for chained conditions.")
return issues
# Async enhancements from ref2
class AsyncSoloCodeAnalyzer(SoloCodeAnalyzer):
"""Async-enabled analyzer for better performance"""
def __init__(self):
super().__init__()
self._cache: Dict[str, AnalysisResult] = {}
self._lock = asyncio.Lock()
async def async_review_code(self, code: str) -> AnalysisResult:
"""Async code review with caching"""
cache_key = hashlib.sha256(code.encode()).hexdigest()
async with self._lock:
if cache_key in self._cache:
return self._cache[cache_key]
# Run analyses in parallel
tasks = [
self._async_syntax_check(code),
self._async_complexity_analysis(code),
self._async_security_scan(code),
self._async_pattern_detection(code),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
review: List[str] = results[0] if not isinstance(results[0], BaseException) else ["Syntax check failed"]
ast_issues: List[str] = results[1] if not isinstance(results[1], BaseException) else []
refactor_suggestions: List[str] = results[2] if not isinstance(results[2], BaseException) else []
optimizations: List[str] = results[3] if not isinstance(results[3], BaseException) else []
analysis_result = AnalysisResult(
review=review,
ast_issues=ast_issues,
refactor_suggestions=refactor_suggestions,
optimizations=optimizations,
metrics=await self._calculate_metrics(code),
confidence=await self._calculate_confidence(results),
grok_formatted="",
reasoning_steps=[]
)
async with self._lock:
self._cache[cache_key] = analysis_result
return analysis_result
async def _async_syntax_check(self, code: str) -> List[str]:
"""Async syntax checking"""
await asyncio.sleep(0) # Yield control
return self.review_code(code)
async def _async_complexity_analysis(self, code: str) -> List[str]:
"""Calculate cyclomatic complexity asynchronously"""
complexity = 0
try:
tree = _cached_ast_parse(code)
for node in ast.walk(tree):
if isinstance(node, (ast.If, ast.While, ast.For, ast.ExceptHandler)):
complexity += 1
elif isinstance(node, ast.BoolOp):
complexity += len(node.values) - 1
except SyntaxError:
return ["Unable to analyze complexity due to syntax errors"]
if complexity > config_manager.complexity_threshold:
return [f"High complexity detected ({complexity}). Consider refactoring."]
return []
async def _async_security_scan(self, code: str) -> List[str]:
"""Async security vulnerability scanning"""
issues = []
dangerous_messages = [
"eval() is dangerous - arbitrary code execution risk",
"exec() is dangerous - arbitrary code execution risk",
"compile() can be used for code injection",
"pickle can execute arbitrary code",
"Dynamic imports can be security risk",
"os.system() - consider subprocess instead",
"subprocess calls can be dangerous if not sanitized",
"Writing to files can be risky",
]
matches = _DANGEROUS_PATTERNS.findall(code)
if matches:
issues.extend([f"SECURITY: Dangerous pattern detected: {m}" for m in set(matches)])
return issues
async def _async_pattern_detection(self, code: str) -> List[str]:
"""Detect design patterns and anti-patterns"""
patterns = []
# Anti-patterns
if "global " in code:
patterns.append("ANTI-PATTERN: Global variable usage detected")
if re.search(r'except:\s*pass', code):
patterns.append("ANTI-PATTERN: Bare except with pass - hides errors")
if code.count("if ") > 10:
patterns.append("ANTI-PATTERN: Too many conditionals - consider polymorphism")
return patterns
async def _calculate_metrics(self, code: str) -> Dict[str, Union[int, float]]:
"""Calculate code metrics"""
lines = code.splitlines()
return {
"total_lines": len(lines),
"code_lines": len([l for l in lines if l.strip() and not l.strip().startswith("#")]),
"comment_lines": len([l for l in lines if l.strip().startswith("#")]),
"function_count": code.count("def "),
"class_count": code.count("class "),
"import_count": len([l for l in lines if l.strip().startswith(("import", "from"))]),
}
async def _calculate_confidence(self, results: List) -> float:
"""Calculate confidence score based on analysis results"""
base_confidence = 0.8
error_count = sum(1 for r in results if isinstance(r, Exception))
return max(0.1, base_confidence - (error_count * 0.2))
# GrokMixin with formatting
class GrokMixin:
"""Mixin for AI model collaboration optimizations"""
def format_for_grok(self, data: Any) -> str:
"""Format data optimally for AI model processing"""
if isinstance(data, dict):
# Use markdown for better comprehension
return self._dict_to_markdown(data)
elif isinstance(data, list):
return "\n".join(f"- {item}" for item in data)
return str(data)
def _dict_to_markdown(self, d: Dict, level: int = 1) -> str:
"""Convert dict to markdown format"""
result = []
for key, value in d.items():
header = "#" * level
result.append(f"{header} {key}")
if isinstance(value, dict):
result.append(self._dict_to_markdown(value, level + 1))
elif isinstance(value, list):
for item in value:
result.append(f"- {item}")
else:
result.append(str(value))
return "\n".join(result)
async def grok_reasoning_steps(self, problem: str) -> List[str]:
"""Guide AI model through step-by-step reasoning"""
steps = [
f"## Problem Analysis\n{problem}",
"## Step 1: Understanding Requirements",
"- What are the inputs?",
"- What are the expected outputs?",
"- What are the constraints?",
"## Step 2: Algorithm Design",
"- What approach should we use?",
"- What data structures are needed?",
"## Step 3: Implementation",
"- Write the core logic",
"- Handle edge cases",
"## Step 4: Optimization",
"- Can we improve time complexity?",
"- Can we improve space complexity?",
]
return steps
# AdaptiveSoloOptimizer
class AdaptiveSoloOptimizer:
"""Adaptive optimization that learns from processing patterns"""
def __init__(self):
self.optimization_history = []
self.pattern_cache = {}
self.performance_metrics = {
'avg_processing_time': 0,
'optimization_success_rate': 0,
'pattern_recognition_accuracy': 0
}
async def optimize_for_context(self, code: str, context: Dict) -> Dict:
"""Dynamically optimize based on detected patterns and history"""
# Detect code patterns
patterns = self._detect_patterns(code)
# Check if we've seen similar patterns before
pattern_key = self._generate_pattern_key(patterns)
if pattern_key in self.pattern_cache:
cached_strategy = self.pattern_cache[pattern_key]
return await self._apply_cached_strategy(code, cached_strategy)
# Learn and adapt
optimization_strategy = await self._learn_optimization_strategy(code, patterns)
self.pattern_cache[pattern_key] = optimization_strategy
return await self._apply_optimization(code, optimization_strategy)
def _detect_patterns(self, code: str) -> Dict:
"""Detect optimization-relevant patterns in code"""
return {
'has_loops': bool(re.search(r'for|while', code)),
'has_recursion': 'def' in code and code.count('(') > 2,
'complexity_level': self._estimate_complexity(code),
'data_structure_usage': self._detect_data_structures(code)
}
def _estimate_complexity(self, code: str) -> int:
"""Simple complexity estimation"""
return code.count('if ') + code.count('for ') + code.count('while ')
def _detect_data_structures(self, code: str) -> List[str]:
"""Detect data structures used"""
structures = []
if 'list(' in code or '[' in code:
structures.append('list')
if 'dict(' in code or '{' in code:
structures.append('dict')
return structures
def _generate_pattern_key(self, patterns: Dict) -> str:
"""Generate a key for pattern caching"""
return hashlib.sha256(str(sorted(patterns.items())).encode()).hexdigest()[:16]
async def _learn_optimization_strategy(self, code: str, patterns: Dict) -> Dict:
"""Learn optimization strategy based on patterns"""
strategy = {'optimizations': []}
if patterns['has_loops']:
strategy['optimizations'].append('vectorize_loops')
if patterns['complexity_level'] > 5:
strategy['optimizations'].append('reduce_complexity')
return strategy
async def _apply_cached_strategy(self, code: str, strategy: Dict) -> Dict:
"""Apply cached optimization strategy"""
return {'optimized_code': code, 'strategy': strategy}
async def _apply_optimization(self, code: str, strategy: Dict) -> Dict:
"""Apply learned optimizations"""
optimized = code
for opt in strategy.get('optimizations', []):
if opt == 'vectorize_loops':
optimized = re.sub(r'for \w+ in range\(len\((.+)\)\):', r'for \1_item in \1:', optimized)
return {'optimized_code': optimized, 'strategy': strategy}
# --- GrokAnalysisSuite ---
class GrokAnalysisSuite:
"""Houses new analysis tools adapted for Grok."""
def __init__(self, config: ConfigManager):
self.config = config
self.cache = TTLCache(ttl=config.cache_ttl, maxsize=config.cache_size)
def analyze_cefr(self, text: str) -> Dict[str, Any]:
"""Analyzes text for CEFR level and potential bias using Grok's reasoning."""
cache_key = hashlib.sha256(f"cefr_{text}".encode()).hexdigest()
if cache_key in self.cache:
return self.cache[cache_key]
# Simplified CEFR analysis for Grok
words = text.split()
avg_word_length = sum(len(w) for w in words) / len(words) if words else 0
complex_words = sum(1 for w in words if len(w) > 6)
complexity_ratio = complex_words / len(words) if words else 0
if complexity_ratio < 0.1:
level = "A1"
elif complexity_ratio < 0.2:
level = "A2"
elif complexity_ratio < 0.3:
level = "B1"
elif complexity_ratio < 0.4:
level = "B2"
else:
level = "C1"
explanation = f"Grok-estimated CEFR level: {level}. Based on word complexity ratio ({complexity_ratio:.2f})."
# Bias detection
warnings = []
male_words = ["he", "him", "his", "man", "men"]
female_words = ["she", "her", "hers", "woman", "women"]
male_count = sum(1 for w in text.lower().split() if w in male_words)
female_count = sum(1 for w in text.lower().split() if w in female_words)
if male_count + female_count > 0:
ratio = male_count / (female_count + 1)
if ratio > 3 or (female_count > 0 and ratio < 0.33):
warnings.append(f"Potential gender bias in language (ratio: {ratio:.2f})")
result = {"level": level, "explanation": explanation, "warnings": warnings}
self.cache[cache_key] = result
return result
def generate_source_criticism(self, text: str) -> Dict[str, List[str]]:
"""Generates source criticism questions for a given text using Grok's reasoning."""
cache_key = hashlib.sha256(f"criticism_{text}".encode()).hexdigest()
if cache_key in self.cache:
return self.cache[cache_key]
questions = [
"Who is the author and what is their purpose?",
"When was this text published and is it still relevant?",
"What evidence or sources are presented? Are they verifiable?",
"[Transparency] How openly is the information's origin disclosed?",
"[Classification] What categories or groups are created in the text? Is the division fair?"
]
warnings = []
sensitive_themes = ["race", "gender", "religion", "politics", "immigration"]
found_themes = [theme for theme in sensitive_themes if theme in text.lower()]
if found_themes:
warnings.append(f"Text touches on potentially sensitive themes: {', '.join(found_themes)}.")
questions.append(f"[Reflection] What perspectives on '{found_themes[0]}' might be missing?")
result = {"questions": questions, "warnings": warnings}
self.cache[cache_key] = result
return result
def generate_concept_map(self, text_input: str, output_path: str) -> str:
"""Generates a simple concept map using text-based representation."""
# Since we don't have matplotlib in Grok version, create a text-based map
G = {}
edge_labels = {}
pattern = re.compile(r'(.+?)\s*->\s*(.+?)\s*\[(.+?)\]')
for line in text_input.strip().split('\n'):
match = pattern.match(line.strip())
if match:
source, target, label = [s.strip() for s in match.groups()]
if source not in G:
G[source] = []
G[source].append(target)
edge_labels[(source, target)] = label
if not G:
raise ValueError("No valid relationships found in input. Format: 'Source -> Target [Label]'")
# Generate text-based concept map
map_text = "Concept Map:\n\n"
for source, targets in G.items():
map_text += f"{source}\n"
for target in targets:
label = edge_labels.get((source, target), "")
map_text += f" -> {target} [{label}]\n"
map_text += "\n"
with open(output_path, 'w', encoding='utf-8') as f:
f.write(map_text)
return f"Text-based concept map saved to {output_path}"
# SoloEnhancedTool
class SoloEnhancedTool:
"""Main integrated tool class with refactored components."""
def __init__(self):
"""Initialize the SoloEnhancedTool with Grok logger and analyzer."""
print(f"{Colors.OKGREEN}Grok solo tool v1.1.0 initialized{Colors.ENDC}")
self.logger = GrokLogger()
self.analyzer = SoloCodeAnalyzer()
self.async_analyzer = AsyncSoloCodeAnalyzer()
self.grok_processor = GrokAnalyzer()
self.optimizer = AdaptiveSoloOptimizer()
self.suite = GrokAnalysisSuite(config_manager)
self.metrics_collector = MetricsCollector()
self.config = config
self.data_manager = DataManager(os.path.dirname(__file__))
self.opencode_runner = OpencodeCLIRunner()
try:
self.web_processor = WebDataProcessor(config_manager)
except ConfigurationError:
self.web_processor = None
logging.warning("WebDataProcessor not available")
self._session_context: Dict[str, Any] = {}
self.logger.start_session()
def generate_code(self, description: str) -> str:
"""Generate code based on a natural language description."""
if not isinstance(description, str) or not description.strip():
raise ValueError("Invalid description provided for code generation.")
desc_lower = description.lower()
if "palindrome" in desc_lower:
return """def is_palindrome(s: str) -> bool:
cleaned = ''.join(c.lower() for c in s if c.isalnum())
return cleaned == cleaned[::-1]"""
elif "fibonacci" in desc_lower:
return """def generate_fibonacci(n: int) -> list:
if n <= 0:
return []
fib = [0, 1]
while len(fib) < n:
fib.append(fib[-1] + fib[-2])
return fib[:n]"""
else:
return "# Code generation not implemented for this description."
def process_tasks(self, tasks: List[int], num_threads: int = config.DEFAULT_NUM_THREADS) -> List[str]:
"""Process a list of tasks using multi-threading."""
if not isinstance(tasks, list) or not all(isinstance(t, int) for t in tasks):
raise ValueError("Invalid tasks list provided.")
def process_task(task_id: int) -> str:
try:
time.sleep(random.uniform(0.1, 0.5))
return f"Task {task_id} completed"
except Exception as e:
return f"Task {task_id} failed: {e}"
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = {executor.submit(process_task, task): task for task in tasks}
results = []
for future in as_completed(futures):
result = future.result()
results.append(result)
return results
def classify_query(self, query: str) -> str:
"""Classify the type of query."""
if not isinstance(query, str) or not query.strip():
raise ValueError("Invalid query provided for classification.")
query_lower = query.lower()
if re.search(r'\b(code|program|script|function|class|variable|debug|error|fix|bug|issue)\b', query_lower):
return "coding"
elif re.search(r'\b(help|how|what|explain|describe|tutorial|guide)\b', query_lower):
return "informational"
elif re.search(r'\b(joke|funny|laugh|humor|pun|story)\b', query_lower):
return "entertainment"
elif re.search(r'\b(opinion|think|feel|review|like|dislike)\b', query_lower):
return "opinion"
elif re.search(r'\b(feature|request|add|implement|suggestion)\b', query_lower):
return "feature_request"
else:
return "general"
def analyze_sentiment(self, text: str) -> str:
"""Perform simple sentiment analysis on text."""
if not isinstance(text, str) or not text.strip():
raise ValueError("Invalid text provided for sentiment analysis.")
positive_words = {"good", "great", "awesome", "love", "excellent", "amazing", "fantastic", "wonderful", "happy", "pleased"}
negative_words = {"bad", "terrible", "hate", "awful", "worst", "horrible", "disappointing", "frustrated", "angry", "sad"}
text_set = set(text.lower().split())
positive_count = len(positive_words & text_set)
negative_count = len(negative_words & text_set)
if positive_count > negative_count:
return "positive"
elif negative_count > positive_count:
return "negative"
else:
return "neutral"
def generate_response(self, query: str, query_type: str, sentiment: str) -> str:
"""Generate a context-aware response based on query type and sentiment."""
if not all(isinstance(s, str) for s in [query, query_type, sentiment]):
raise ValueError("Invalid inputs provided for response generation.")
if query_type == "coding":
if sentiment == "positive":
return "Glad you're enjoying coding! What's your favorite language?"
else:
return "Coding can be tricky, but persistence pays off. What's the issue?"
elif query_type == "entertainment":
return "Humor is key! Why did the programmer quit? Because he didn't get arrays!"
elif query_type == "opinion":
return f"I appreciate your {sentiment} opinion on '{query}'. What's your take?"
elif query_type == "feature_request":
return f"Thanks for the {sentiment} feature request: '{query}'. I'll consider it for future improvements."
else:
return f"Got your {sentiment} query: '{query}'. How can I assist?"
def check_bias(self, text: str) -> List[str]:
"""Check for potential biases in the given text."""
if not isinstance(text, str) or not text.strip():
raise ValueError("Invalid text provided for bias check.")
biases = []
text_lower = text.lower()
if "always" in text_lower or "never" in text_lower:
biases.append("Absolute language detected – may indicate bias.")
if ("men" in text_lower and "women" not in text_lower) or ("women" in text_lower and "men" not in text_lower):
biases.append("Gender imbalance in language.")
if "best" in text_lower and "worst" in text_lower:
biases.append("Extreme comparisons – check for fairness.")
if "old" in text_lower and "young" not in text_lower:
biases.append("Potential age bias detected.")
if "white" in text_lower or "black" in text_lower:
biases.append("Potential racial bias detected.")
if not biases:
biases.append("No obvious biases detected.")
return biases
def analyze_context(self, user_input: str, history: List[str]) -> str:
"""Analyze the context from user input and history."""
if not isinstance(user_input, str) or not isinstance(history, list):
raise ValueError("Invalid inputs provided for context analysis.")
context = "general"
user_lower = user_input.lower()
if "code" in user_lower or "program" in user_lower or "script" in user_lower:
context = "coding"
if history and "error" in history[-1].lower():
context = "debugging"
if "optimize" in user_lower or "improve" in user_lower:
context = "optimization"
return context
def version_control_commit(self, code: str, message: str, repo_path: str = "generated_code_repo") -> None:
"""Commit code to a version control repository."""
if not isinstance(code, str) or not isinstance(message, str) or not isinstance(repo_path, str):
raise ValueError("Invalid inputs provided for version control commit.")
import os
os.makedirs(repo_path, exist_ok=True)
version = len([f for f in os.listdir(repo_path) if f.startswith("version_")]) + 1
file_path = os.path.join(repo_path, f"version_{version}.py")
with open(file_path, 'w') as f:
f.write(code)
print(f"Committed version {version}: {message}")
def analyze_error(self, error_msg: str) -> str:
"""Analyze an error message with humorous explanations."""
if not isinstance(error_msg, str) or not error_msg.strip():
raise ValueError("Invalid error message provided for analysis.")
if "SyntaxError" in error_msg:
return "Syntax issue: Looks like your code has a wardrobe malfunction – missing colons or brackets? Check for typos!"
elif "NameError" in error_msg:
return "Undefined name: Did you forget to introduce your variable? It's like calling someone who isn't there."
elif "TypeError" in error_msg:
return "Type mismatch: You're trying to mix oil and water – incompatible types in your operation."
elif "ValueError" in error_msg:
return "Invalid value: That's not what I expected! The value is right type but wrong flavor."
elif "IndexError" in error_msg:
return "Index out of range: You're reaching for the cookie jar on the top shelf – index too high!"
elif "KeyError" in error_msg:
return "Key not found: Lost your keys again? That dictionary key doesn't exist."
elif "AttributeError" in error_msg:
return "AttributeError: Trying to teach a fish to fly? That object doesn't have that attribute."
else:
return "General error: Something's amiss, but I'm not sure what – time for a closer look!"
def load_json_file(self, filepath: str) -> Any:
"""Centralized utility to load JSON with robust error handling and basic schema validation."""
if not os.path.exists(filepath):
logging.warning(f"JSON file not found: {filepath}. Returning empty list.")
return []
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = _json_loads(f.read())
# Basic schema validation
if filepath.endswith('centralized_tasks.json') and not isinstance(data, dict):
logging.error(f"Invalid schema in {filepath}: Expected dict. Returning empty dict.")
return {}
elif filepath.endswith('_log.json') and not isinstance(data, list):
logging.error(f"Invalid schema in {filepath}: Expected list. Returning empty list.")
return []
return data
except Exception as e:
logging.error(f"Error loading JSON from {filepath}: {e}. Returning empty list.")
return []
def save_json_file(self, filepath: str, data: Any, indent: int = 4) -> None:
"""Centralized utility to save JSON with consistent formatting and error handling."""
try:
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(_json_dumps(data, indent=indent))
except Exception as e:
logging.error(f"Error writing to {filepath}: {e}")
def safe_json_load(self, file_path: str) -> Any:
"""Alias for load_json_file for backward compatibility."""
return self.load_json_file(file_path)
def safe_json_dump(self, data: Any, file_path: str) -> None:
"""Alias for save_json_file for backward compatibility."""
self.save_json_file(file_path, data)
def run_full_analysis(self, code: str) -> Dict[str, Any]:
"""Run a full analysis on the provided code."""
if not isinstance(code, str):
raise ValueError("Invalid code provided for full analysis.")
self.logger.log_analysis("code_analysis", "Running full analysis")
review = self.analyzer.review_code(code)
ast_issues = self.analyzer.analyze_ast(code)
refactor = self.analyzer.identify_refactor(code)
optimizations = self.analyzer.optimize_code(code)
return {
"review": review,
"ast_issues": ast_issues,
"refactor_suggestions": refactor,
"optimizations": optimizations
}
# Project analysis with batch processing
async def analyze_project_async(self, project_path: str, batch_size: int = 10) -> Dict[str, Any]:
"""Analyzes all Python files in a project directory with batch processing."""
project_path = Path(project_path)
if not project_path.is_dir():
raise ValueError(f"'{project_path}' is not a valid directory.")
logger.info(f"{Colors.OKBLUE}Analyzing project at: {project_path}{Colors.ENDC}")
all_results = {}
py_files = list(project_path.rglob("*.py"))
logger.info(f"Found {len(py_files)} Python files to analyze.")
# Use semaphore for concurrency control
sem = asyncio.Semaphore(10) # Limit to 10 concurrent analyses
async def sem_analyze(file_path):
async with sem:
return await self._analyze_file_async(file_path, project_path)
# Process in batches
for i in range(0, len(py_files), batch_size):
batch = py_files[i:i + batch_size]
logger.info(f"Processing batch {i//batch_size + 1} of {(len(py_files) + batch_size - 1)//batch_size}")
batch_results = await asyncio.gather(*(sem_analyze(f) for f in batch), return_exceptions=True)
for file_path, result in zip(batch, batch_results):
if isinstance(result, Exception):
logger.error(f"Could not analyze {file_path}: {result}")
else:
all_results[str(file_path.relative_to(project_path))] = result
return all_results
async def _analyze_file_async(self, file_path: Path, project_path: Path) -> List[str]:
"""Analyze a single file asynchronously."""
logger.info(f"Analyzing {file_path.relative_to(project_path)}...")
try:
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
code = await f.read()
# Run static analysis
analysis_results = self.analyzer.review_code(code)
return analysis_results
except Exception as e:
raise e
def analyze_project(self, project_path: str) -> Dict[str, Any]:
"""Synchronous wrapper for project analysis."""
return asyncio.run(self.analyze_project_async(project_path))
@RetryableOperation(max_retries=config_manager.retry_attempts, delay=config_manager.retry_delay)
async def grok_analysis_async(self, code: str, context: Optional[Dict[str, Any]] = None) -> AnalysisResult:
"""Async Grok analysis optimized for solo processing"""
request_id = hashlib.md5(f"{code}{time.time()}".encode()).hexdigest()[:8]
start_time = datetime.now()
metric = APIMetrics(request_id=request_id, start_time=start_time, model_used="Grok")
try:
with ErrorContext("grok_analysis"):
# Store context for session continuity
self._session_context.update(context or {})
# Perform async analysis
result = await self.async_analyzer.async_review_code(code)
# Apply adaptive optimization
optimization = await self.optimizer.optimize_for_context(code, self._session_context)
result["optimizations"].extend(optimization.get('strategy', {}).get('optimizations', []))
# Add Grok-specific formatting
result["grok_formatted"] = self.format_for_grok(result)
# Add reasoning steps for complex issues
if result["ast_issues"] or result["refactor_suggestions"]:
result["reasoning_steps"] = await self.grok_reasoning_steps(
f"How to fix: {result['ast_issues'][:3]}"
)
# Update metrics
metric.end_time = datetime.now()
metric.response_time = (metric.end_time - metric.start_time).total_seconds()
metric.success = True
self.metrics_collector.add_metric(metric)
return result
except Exception as e:
metric.end_time = datetime.now()
metric.response_time = (metric.end_time - metric.start_time).total_seconds()
metric.error_message = str(e)
metric.success = False
self.metrics_collector.add_metric(metric)
raise
# CLI Interface
def create_parser():
"""Create argument parser for CLI."""
parser = argparse.ArgumentParser(description="Grok Optimized Integrated Tool v1.1.0")
parser.add_argument("action", choices=["analyze", "generate", "optimize", "web-fetch", "math-solve"], help="Action to perform")
parser.add_argument("--code", help="Code to analyze or description for generation")
parser.add_argument("--file", help="File to analyze")
parser.add_argument("--output", help="Output file for results")
parser.add_argument("--url", help="URL for web-fetch action")
parser.add_argument("--expression", help="Math expression for math-solve")
parser.add_argument("--symbol", help="Symbol for math-solve", default="x")
return parser
def main():
"""Main CLI entry point."""
parser = create_parser()
args = parser.parse_args()
tool = SoloEnhancedTool()
try:
if args.action == "analyze":
code = args.code
if args.file:
with open(args.file, 'r') as f:
code = f.read()
if not code:
print("No code provided for analysis.")
return
result = tool.run_full_analysis(code)
output = _json_dumps(result, indent=2)
elif args.action == "generate":
if not args.code:
print("No description provided for code generation.")
return
output = tool.generate_code(args.code)
elif args.action == "optimize":
code = args.code
if args.file:
with open(args.file, 'r') as f:
code = f.read()
if not code:
print("No code provided for optimization.")
return
result = tool.analyzer.optimize_code(code)
output = "\n".join(result)
elif args.action == "web-fetch":
if not args.url:
print("No URL provided for web-fetch.")
return
if tool.web_processor is None:
print("Web processor not available.")
return
content = asyncio.run(tool.web_processor.fetch_url_content(args.url))
output = content
elif args.action == "math-solve":
if not args.expression:
print("No expression provided for math-solve.")
return
if sp is None or parse_expr is None:
output = "SymPy not available for math solving."
else:
try:
x = sp.symbols(args.symbol)
expr = parse_expr(args.expression)
solution = sp.solve(expr, x)
output = str(solution)
except Exception as e:
output = f"Error solving: {e}"
if args.output:
with open(args.output, 'w') as f:
f.write(output)
print(f"Results saved to {args.output}")
else:
print(output)
except Exception as e:
print(f"Error: {e}")
finally:
tool.logger.end_session()
# Basic unit tests
def test_factorial(n) -> int:
"""Test function for factorial calculation with type checking."""
if not isinstance(n, int) or n < 0:
raise TypeError("Invalid input for factorial: must be non-negative integer.")
if n == 0:
return 1
return n * test_factorial(n - 1)
def run_tests() -> None:
"""Run basic unit tests for the toolkit."""
try:
# Test factorial
assert test_factorial(0) == 1
assert test_factorial(1) == 1
assert test_factorial(5) == 120
print("Factorial tests passed.")
# Test type mismatch
try:
test_factorial(3.5) # Should raise TypeError
print("Type test failed: Expected TypeError for float input.")
except TypeError:
print("Type test passed: Correctly raised TypeError for float input.")
except Exception as e:
print(f"Type test failed: Unexpected error: {e}")
try:
test_factorial(-1) # Should raise TypeError
print("Negative test failed: Expected TypeError for negative input.")
except TypeError:
print("Negative test passed: Correctly raised TypeError for negative input.")
except Exception as e:
print(f"Negative test failed: Unexpected error: {e}")
try:
test_factorial('a') # Should raise TypeError
print("String test failed: Expected TypeError for string input.")
except TypeError:
print("String test passed: Correctly raised TypeError for string input.")
except Exception as e:
print(f"String test failed: Unexpected error: {e}")
except AssertionError as e:
print(f"Factorial test failed: {e}")
except RecursionError:
print("Factorial test failed: Recursion depth exceeded.")
# Example usage
if __name__ == "__main__":
if len(sys.argv) > 1:
main()
else:
tool = SoloEnhancedTool()
sample_code = '''
def process_list(arr):
result = []
for i in range(len(arr)):
item = arr[i]
if item > 5:
result.append(item * 2)
print("Processing complete.")
return result
'''
analysis = tool.run_full_analysis(sample_code)
print("Grok Full Analysis:")
for key, value in analysis.items():
print(f"{key}: {value}")
run_tests()
tool.logger.end_session()