Tools and MCP Examples¶
This comprehensive collection showcases real-world examples of RubyLLM tools and MCP client integrations, demonstrating practical applications and advanced techniques.
Real-World Tool Examples¶
File Processing Tools¶
Advanced Log Analyzer¶
# ~/.aia/tools/log_analyzer.rb
require 'time'
require 'json'
class LogAnalyzer < RubyLLM::Tool
description "Analyzes log files for patterns, errors, and performance metrics"
def analyze_logs(log_file, time_range = "24h", error_threshold = 10)
return "Log file not found: #{log_file}" unless File.exist?(log_file)
logs = parse_log_file(log_file)
filtered_logs = filter_by_time(logs, time_range)
analysis = {
total_entries: filtered_logs.length,
error_count: count_errors(filtered_logs),
warning_count: count_warnings(filtered_logs),
top_errors: find_top_errors(filtered_logs, 5),
performance_stats: calculate_performance_stats(filtered_logs),
anomalies: detect_anomalies(filtered_logs),
recommendations: generate_recommendations(filtered_logs, error_threshold)
}
JSON.pretty_generate(analysis)
end
def extract_error_patterns(log_file, pattern_limit = 10)
return "Log file not found: #{log_file}" unless File.exist?(log_file)
errors = []
File.foreach(log_file) do |line|
if line.match?(/ERROR|FATAL|EXCEPTION/i)
errors << extract_error_context(line)
end
end
patterns = group_similar_errors(errors)
top_patterns = patterns.sort_by { |_, count| -count }.first(pattern_limit)
{
total_errors: errors.length,
unique_patterns: patterns.length,
top_patterns: top_patterns.map { |pattern, count|
{ pattern: pattern, occurrences: count, severity: assess_severity(pattern) }
}
}.to_json
end
def performance_report(log_file, metric = "response_time")
logs = parse_log_file(log_file)
performance_data = extract_performance_data(logs, metric)
return "No performance data found for metric: #{metric}" if performance_data.empty?
stats = calculate_detailed_stats(performance_data)
percentiles = calculate_percentiles(performance_data)
trends = analyze_trends(performance_data)
{
metric: metric,
statistics: stats,
percentiles: percentiles,
trends: trends,
alerts: generate_performance_alerts(stats, percentiles)
}.to_json
end
private
def parse_log_file(file_path)
logs = []
File.foreach(file_path) do |line|
parsed = parse_log_line(line.strip)
logs << parsed if parsed
end
logs
end
def parse_log_line(line)
# Support multiple log formats
formats = [
/^(?<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(?<level>\w+)\] (?<message>.*)/,
/^(?<level>\w+) (?<timestamp>\w{3} \d{2} \d{2}:\d{2}:\d{2}) (?<message>.*)/,
/^\[(?<timestamp>.*?)\] (?<level>\w+): (?<message>.*)/
]
formats.each do |format|
match = line.match(format)
if match
return {
timestamp: parse_timestamp(match[:timestamp]),
level: match[:level].upcase,
message: match[:message],
raw: line
}
end
end
nil # Unable to parse
end
def filter_by_time(logs, time_range)
cutoff = case time_range
when /(\d+)h/ then Time.now - ($1.to_i * 3600)
when /(\d+)d/ then Time.now - ($1.to_i * 86400)
when /(\d+)m/ then Time.now - ($1.to_i * 60)
else Time.now - 86400 # Default: 24 hours
end
logs.select { |log| log[:timestamp] && log[:timestamp] > cutoff }
end
def count_errors(logs)
logs.count { |log| ['ERROR', 'FATAL', 'CRITICAL'].include?(log[:level]) }
end
def count_warnings(logs)
logs.count { |log| log[:level] == 'WARN' || log[:level] == 'WARNING' }
end
def find_top_errors(logs, limit)
error_logs = logs.select { |log| ['ERROR', 'FATAL'].include?(log[:level]) }
error_groups = error_logs.group_by { |log| normalize_error_message(log[:message]) }
error_groups.map { |error, occurrences|
{
error: error,
count: occurrences.length,
first_seen: occurrences.map { |o| o[:timestamp] }.min,
last_seen: occurrences.map { |o| o[:timestamp] }.max,
sample: occurrences.first[:raw]
}
}.sort_by { |e| -e[:count] }.first(limit)
end
def detect_anomalies(logs)
anomalies = []
# Detect error spikes
hourly_errors = group_by_hour(logs.select { |l| l[:level] == 'ERROR' })
avg_errors = hourly_errors.values.sum.to_f / hourly_errors.length
hourly_errors.each do |hour, count|
if count > avg_errors * 3 # 3x average is anomalous
anomalies << {
type: 'error_spike',
hour: hour,
count: count,
severity: 'high'
}
end
end
# Detect unusual silence periods
if hourly_errors.values.any? { |count| count == 0 }
anomalies << {
type: 'unusual_silence',
description: 'Periods with zero activity detected',
severity: 'medium'
}
end
anomalies
end
end
Configuration File Manager¶
# ~/.aia/tools/config_manager.rb
require 'yaml'
require 'json'
require 'fileutils'
class ConfigManager < RubyLLM::Tool
description "Manages configuration files across different formats (YAML, JSON, ENV)"
def analyze_config(config_file)
return "Config file not found: #{config_file}" unless File.exist?(config_file)
format = detect_format(config_file)
config_data = load_config(config_file, format)
analysis = {
file: config_file,
format: format,
structure: analyze_structure(config_data),
security: security_analysis(config_data),
completeness: completeness_check(config_data),
recommendations: generate_config_recommendations(config_data, format)
}
JSON.pretty_generate(analysis)
end
def validate_config(config_file, schema_file = nil)
return "Config file not found: #{config_file}" unless File.exist?(config_file)
format = detect_format(config_file)
config_data = load_config(config_file, format)
validation_results = {
syntax_valid: true,
structure_issues: [],
security_issues: [],
recommendations: []
}
# Syntax validation
begin
load_config(config_file, format)
rescue => e
validation_results[:syntax_valid] = false
validation_results[:structure_issues] << "Syntax error: #{e.message}"
end
# Security validation
security_issues = find_security_issues(config_data)
validation_results[:security_issues] = security_issues
# Schema validation if provided
if schema_file && File.exist?(schema_file)
schema_validation = validate_against_schema(config_data, schema_file)
validation_results[:schema_validation] = schema_validation
end
JSON.pretty_generate(validation_results)
end
def merge_configs(base_config, override_config, output_file = nil)
base_format = detect_format(base_config)
override_format = detect_format(override_config)
base_data = load_config(base_config, base_format)
override_data = load_config(override_config, override_format)
merged_data = deep_merge(base_data, override_data)
if output_file
output_format = detect_format(output_file)
save_config(merged_data, output_file, output_format)
"Configuration merged and saved to: #{output_file}"
else
JSON.pretty_generate(merged_data)
end
end
def extract_secrets(config_file, patterns = nil)
content = File.read(config_file)
default_patterns = [
/password\s*[:=]\s*["']?([^"'\s]+)["']?/i,
/api[_-]?key\s*[:=]\s*["']?([^"'\s]+)["']?/i,
/secret\s*[:=]\s*["']?([^"'\s]+)["']?/i,
/token\s*[:=]\s*["']?([^"'\s]+)["']?/i,
/database_url\s*[:=]\s*["']?([^"'\s]+)["']?/i
]
patterns ||= default_patterns
secrets = []
patterns.each do |pattern|
content.scan(pattern) do |match|
secrets << {
type: detect_secret_type(pattern),
value: mask_secret(match[0]),
line: content.lines.find_index { |line| line.include?(match[0]) } + 1,
severity: assess_secret_severity(match[0])
}
end
end
{
file: config_file,
secrets_found: secrets.length,
secrets: secrets,
recommendations: generate_secret_recommendations(secrets)
}.to_json
end
private
def detect_format(file_path)
ext = File.extname(file_path).downcase
case ext
when '.yml', '.yaml' then 'yaml'
when '.json' then 'json'
when '.env' then 'env'
when '.ini' then 'ini'
else
# Try to detect from content
content = File.read(file_path).strip
return 'json' if content.start_with?('{') || content.start_with?('[')
return 'yaml' if content.match?(/^\w+:/)
return 'env' if content.match?(/^\w+=/)
'unknown'
end
end
def load_config(file_path, format)
content = File.read(file_path)
case format
when 'yaml'
YAML.safe_load(content)
when 'json'
JSON.parse(content)
when 'env'
parse_env_file(content)
else
{ raw_content: content }
end
end
def parse_env_file(content)
env_vars = {}
content.lines.each do |line|
line = line.strip
next if line.empty? || line.start_with?('#')
key, value = line.split('=', 2)
env_vars[key] = value&.gsub(/^["']|["']$/, '') if key
end
env_vars
end
def deep_merge(base, override)
base.merge(override) do |key, base_val, override_val|
if base_val.is_a?(Hash) && override_val.is_a?(Hash)
deep_merge(base_val, override_val)
else
override_val
end
end
end
end
Development Tools¶
Code Quality Analyzer¶
# ~/.aia/tools/code_quality.rb
class CodeQualityAnalyzer < RubyLLM::Tool
description "Analyzes code quality metrics, complexity, and best practices"
def analyze_codebase(directory, language = nil)
return "Directory not found: #{directory}" unless Dir.exist?(directory)
files = find_code_files(directory, language)
return "No code files found" if files.empty?
results = {
summary: {
total_files: files.length,
total_lines: 0,
languages: {}
},
quality_metrics: {},
issues: [],
recommendations: []
}
files.each do |file|
file_analysis = analyze_file(file)
results[:summary][:total_lines] += file_analysis[:line_count]
lang = detect_language(file)
results[:summary][:languages][lang] ||= 0
results[:summary][:languages][lang] += 1
results[:quality_metrics][file] = file_analysis
results[:issues].concat(file_analysis[:issues])
end
results[:recommendations] = generate_recommendations(results)
JSON.pretty_generate(results)
end
def calculate_complexity(file_path)
return "File not found: #{file_path}" unless File.exist?(file_path)
content = File.read(file_path)
language = detect_language(file_path)
complexity = case language
when 'ruby'
calculate_ruby_complexity(content)
when 'python'
calculate_python_complexity(content)
when 'javascript'
calculate_js_complexity(content)
else
calculate_generic_complexity(content)
end
{
file: file_path,
language: language,
cyclomatic_complexity: complexity[:cyclomatic],
cognitive_complexity: complexity[:cognitive],
maintainability_index: complexity[:maintainability],
complexity_rating: rate_complexity(complexity[:cyclomatic])
}.to_json
end
def check_best_practices(file_path)
return "File not found: #{file_path}" unless File.exist?(file_path)
content = File.read(file_path)
language = detect_language(file_path)
violations = []
case language
when 'ruby'
violations.concat(check_ruby_practices(content))
when 'python'
violations.concat(check_python_practices(content))
when 'javascript'
violations.concat(check_js_practices(content))
end
# Generic checks
violations.concat(check_generic_practices(content, file_path))
{
file: file_path,
language: language,
violations: violations,
score: calculate_practice_score(violations),
recommendations: prioritize_fixes(violations)
}.to_json
end
private
def find_code_files(directory, language = nil)
extensions = if language
language_extensions(language)
else
%w[.rb .py .js .java .cpp .c .go .rs .php .cs .swift .kt]
end
Dir.glob("#{directory}/**/*").select do |file|
File.file?(file) && extensions.include?(File.extname(file).downcase)
end
end
def analyze_file(file_path)
content = File.read(file_path)
lines = content.lines
{
file: file_path,
line_count: lines.length,
blank_lines: lines.count(&:strip.empty?),
comment_lines: count_comment_lines(content, detect_language(file_path)),
complexity: calculate_complexity_metrics(content),
issues: find_code_issues(content, file_path),
maintainability: assess_maintainability(content)
}
end
def calculate_ruby_complexity(content)
# Simplified Ruby complexity calculation
cyclomatic = 1 # Base complexity
# Add complexity for control structures
cyclomatic += content.scan(/\b(if|unless|while|until|for|case|rescue)\b/).length
cyclomatic += content.scan(/&&|\|\|/).length
cyclomatic += content.scan(/\?.*:/).length # Ternary operators
# Method definitions add complexity
method_count = content.scan(/def\s+\w+/).length
{
cyclomatic: cyclomatic,
cognitive: calculate_cognitive_complexity(content, 'ruby'),
maintainability: calculate_maintainability_index(content, cyclomatic),
method_count: method_count
}
end
def check_ruby_practices(content)
violations = []
# Check for long methods (>20 lines)
methods = content.scan(/def\s+\w+.*?end/m)
methods.each do |method|
if method.lines.length > 20
violations << {
type: 'long_method',
severity: 'medium',
message: 'Method exceeds 20 lines',
line: find_line_number(content, method)
}
end
end
# Check for deep nesting
max_indent = content.lines.map { |line| line.match(/^\s*/)[0].length }.max
if max_indent > 8
violations << {
type: 'deep_nesting',
severity: 'medium',
message: 'Excessive nesting detected',
max_depth: max_indent / 2
}
end
# Check for missing documentation
if !content.match?(/^#.*/) && content.match?(/class\s+\w+/)
violations << {
type: 'missing_documentation',
severity: 'low',
message: 'Class lacks documentation'
}
end
violations
end
def generate_recommendations(analysis_results)
recommendations = []
# File count recommendations
if analysis_results[:summary][:total_files] > 100
recommendations << "Consider organizing large codebase into modules or packages"
end
# Language diversity
if analysis_results[:summary][:languages].keys.length > 3
recommendations << "High language diversity may increase maintenance complexity"
end
# Quality-based recommendations
high_complexity_files = analysis_results[:quality_metrics].select do |file, metrics|
metrics[:complexity][:cyclomatic] > 10
end
if high_complexity_files.any?
recommendations << "#{high_complexity_files.length} files have high complexity - consider refactoring"
end
recommendations
end
end
MCP Integration Examples¶
GitHub Repository Analyzer MCP¶
# github_analyzer_mcp.py
import asyncio
import os
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
import aiohttp
import json
from datetime import datetime, timedelta
server = Server("github-analyzer", "1.0.0")
class GitHubAnalyzer:
def __init__(self, token):
self.token = token
self.headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3+json'
}
async def analyze_repository(self, owner, repo):
"""Comprehensive repository analysis"""
async with aiohttp.ClientSession() as session:
# Get basic repo info
repo_info = await self._get_repo_info(session, owner, repo)
# Get commit activity
commits = await self._get_commit_activity(session, owner, repo)
# Get issues and PRs
issues = await self._get_issues_analysis(session, owner, repo)
prs = await self._get_pr_analysis(session, owner, repo)
# Get contributors
contributors = await self._get_contributors_analysis(session, owner, repo)
# Get code quality indicators
code_quality = await self._analyze_code_quality(session, owner, repo)
return {
'repository': repo_info,
'activity': commits,
'issues': issues,
'pull_requests': prs,
'contributors': contributors,
'code_quality': code_quality,
'health_score': self._calculate_health_score(repo_info, commits, issues, prs),
'recommendations': self._generate_recommendations(repo_info, commits, issues, prs)
}
async def _get_repo_info(self, session, owner, repo):
url = f'https://api.github.com/repos/{owner}/{repo}'
async with session.get(url, headers=self.headers) as response:
if response.status == 200:
data = await response.json()
return {
'name': data['name'],
'description': data.get('description', ''),
'language': data.get('language', 'Unknown'),
'stars': data['stargazers_count'],
'forks': data['forks_count'],
'open_issues': data['open_issues_count'],
'created_at': data['created_at'],
'updated_at': data['updated_at'],
'size': data['size'],
'license': data.get('license', {}).get('name', 'None') if data.get('license') else 'None'
}
return {}
async def _get_commit_activity(self, session, owner, repo):
# Get commits from last 30 days
since = (datetime.now() - timedelta(days=30)).isoformat()
url = f'https://api.github.com/repos/{owner}/{repo}/commits'
params = {'since': since, 'per_page': 100}
async with session.get(url, headers=self.headers, params=params) as response:
if response.status == 200:
commits = await response.json()
# Analyze commit patterns
daily_commits = {}
authors = {}
for commit in commits:
date = commit['commit']['author']['date'][:10]
author = commit['commit']['author']['name']
daily_commits[date] = daily_commits.get(date, 0) + 1
authors[author] = authors.get(author, 0) + 1
return {
'total_commits_30d': len(commits),
'daily_average': len(commits) / 30,
'most_active_day': max(daily_commits.items(), key=lambda x: x[1]) if daily_commits else None,
'active_contributors': len(authors),
'top_contributor': max(authors.items(), key=lambda x: x[1]) if authors else None
}
return {}
def _calculate_health_score(self, repo_info, commits, issues, prs):
"""Calculate overall repository health score (0-100)"""
score = 0
# Activity score (30 points)
if commits.get('total_commits_30d', 0) > 10:
score += 30
elif commits.get('total_commits_30d', 0) > 5:
score += 20
elif commits.get('total_commits_30d', 0) > 0:
score += 10
# Documentation score (20 points)
if repo_info.get('description'):
score += 10
# Additional checks would go here (README, wiki, etc.)
# Community score (25 points)
if repo_info.get('stars', 0) > 100:
score += 15
elif repo_info.get('stars', 0) > 10:
score += 10
elif repo_info.get('stars', 0) > 0:
score += 5
if issues.get('response_time_avg', float('inf')) < 7: # Average response < 7 days
score += 10
# Maintenance score (25 points)
last_update = datetime.fromisoformat(repo_info.get('updated_at', '1970-01-01T00:00:00Z').replace('Z', '+00:00'))
days_since_update = (datetime.now(last_update.tzinfo) - last_update).days
if days_since_update < 30:
score += 25
elif days_since_update < 90:
score += 15
elif days_since_update < 365:
score += 5
return min(score, 100)
@server.list_tools()
async def list_tools():
return [
Tool(
name="analyze_repository",
description="Analyze GitHub repository health, activity, and metrics",
inputSchema={
"type": "object",
"properties": {
"owner": {"type": "string", "description": "Repository owner"},
"repo": {"type": "string", "description": "Repository name"}
},
"required": ["owner", "repo"]
}
),
Tool(
name="compare_repositories",
description="Compare multiple repositories across key metrics",
inputSchema={
"type": "object",
"properties": {
"repositories": {
"type": "array",
"items": {
"type": "object",
"properties": {
"owner": {"type": "string"},
"repo": {"type": "string"}
}
}
}
}
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
token = os.getenv('GITHUB_TOKEN')
if not token:
return TextContent(type="text", text="Error: GITHUB_TOKEN environment variable not set")
analyzer = GitHubAnalyzer(token)
if name == "analyze_repository":
result = await analyzer.analyze_repository(arguments["owner"], arguments["repo"])
return TextContent(type="text", text=json.dumps(result, indent=2))
elif name == "compare_repositories":
comparisons = []
for repo_data in arguments["repositories"]:
analysis = await analyzer.analyze_repository(repo_data["owner"], repo_data["repo"])
comparisons.append({
"repository": f"{repo_data['owner']}/{repo_data['repo']}",
"analysis": analysis
})
return TextContent(type="text", text=json.dumps(comparisons, indent=2))
return TextContent(type="text", text=f"Unknown tool: {name}")
if __name__ == "__main__":
asyncio.run(server.run())
Database Schema Analyzer MCP¶
# database_schema_mcp.py
import asyncio
import os
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
import asyncpg
import json
from datetime import datetime
server = Server("database-analyzer", "1.0.0")
class DatabaseAnalyzer:
def __init__(self, connection_string):
self.connection_string = connection_string
async def analyze_schema(self, schema_name='public'):
"""Comprehensive database schema analysis"""
conn = await asyncpg.connect(self.connection_string)
try:
# Get all tables
tables = await self._get_tables(conn, schema_name)
# Analyze each table
table_analyses = {}
for table in tables:
table_analyses[table['table_name']] = await self._analyze_table(conn, schema_name, table['table_name'])
# Get relationships
relationships = await self._get_relationships(conn, schema_name)
# Get indexes
indexes = await self._get_indexes(conn, schema_name)
# Performance analysis
performance = await self._analyze_performance(conn, schema_name)
return {
'schema': schema_name,
'tables': table_analyses,
'relationships': relationships,
'indexes': indexes,
'performance': performance,
'recommendations': self._generate_schema_recommendations(table_analyses, relationships, indexes)
}
finally:
await conn.close()
async def _get_tables(self, conn, schema_name):
query = """
SELECT table_name,
pg_total_relation_size(quote_ident(table_name)) as size_bytes
FROM information_schema.tables
WHERE table_schema = $1 AND table_type = 'BASE TABLE'
ORDER BY table_name
"""
return await conn.fetch(query, schema_name)
async def _analyze_table(self, conn, schema_name, table_name):
# Get columns
columns_query = """
SELECT column_name, data_type, is_nullable, column_default,
character_maximum_length, numeric_precision, numeric_scale
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2
ORDER BY ordinal_position
"""
columns = await conn.fetch(columns_query, schema_name, table_name)
# Get row count
try:
row_count = await conn.fetchval(f'SELECT COUNT(*) FROM {schema_name}.{table_name}')
except:
row_count = 0
# Get primary keys
pk_query = """
SELECT column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu ON tc.constraint_name = kcu.constraint_name
WHERE tc.table_schema = $1 AND tc.table_name = $2 AND tc.constraint_type = 'PRIMARY KEY'
"""
primary_keys = await conn.fetch(pk_query, schema_name, table_name)
return {
'columns': [dict(col) for col in columns],
'row_count': row_count,
'primary_keys': [pk['column_name'] for pk in primary_keys],
'data_quality': await self._assess_data_quality(conn, schema_name, table_name, columns)
}
async def _assess_data_quality(self, conn, schema_name, table_name, columns):
quality_issues = []
for column in columns:
col_name = column['column_name']
# Check for null values in non-nullable columns
if column['is_nullable'] == 'NO':
null_count = await conn.fetchval(
f'SELECT COUNT(*) FROM {schema_name}.{table_name} WHERE {col_name} IS NULL'
)
if null_count > 0:
quality_issues.append({
'type': 'unexpected_nulls',
'column': col_name,
'count': null_count
})
# Check for duplicate values in potential key columns
if 'id' in col_name.lower() or col_name.lower().endswith('_key'):
duplicate_query = f"""
SELECT COUNT(*) FROM (
SELECT {col_name}, COUNT(*) as cnt
FROM {schema_name}.{table_name}
WHERE {col_name} IS NOT NULL
GROUP BY {col_name}
HAVING COUNT(*) > 1
) duplicates
"""
duplicate_count = await conn.fetchval(duplicate_query)
if duplicate_count > 0:
quality_issues.append({
'type': 'duplicates',
'column': col_name,
'duplicate_groups': duplicate_count
})
return quality_issues
@server.list_tools()
async def list_tools():
return [
Tool(
name="analyze_schema",
description="Analyze database schema structure, relationships, and quality",
inputSchema={
"type": "object",
"properties": {
"schema_name": {"type": "string", "default": "public"}
}
}
),
Tool(
name="performance_analysis",
description="Analyze database performance metrics and slow queries",
inputSchema={
"type": "object",
"properties": {
"time_period": {"type": "string", "default": "1h"}
}
}
),
Tool(
name="suggest_indexes",
description="Suggest database indexes based on query patterns",
inputSchema={
"type": "object",
"properties": {
"table_name": {"type": "string"},
"schema_name": {"type": "string", "default": "public"}
}
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
connection_string = os.getenv('DATABASE_URL')
if not connection_string:
return TextContent(type="text", text="Error: DATABASE_URL environment variable not set")
analyzer = DatabaseAnalyzer(connection_string)
try:
if name == "analyze_schema":
schema_name = arguments.get("schema_name", "public")
result = await analyzer.analyze_schema(schema_name)
return TextContent(type="text", text=json.dumps(result, indent=2, default=str))
elif name == "performance_analysis":
# Implementation for performance analysis
return TextContent(type="text", text="Performance analysis not yet implemented")
elif name == "suggest_indexes":
# Implementation for index suggestions
return TextContent(type="text", text="Index suggestions not yet implemented")
except Exception as e:
return TextContent(type="text", text=f"Error: {str(e)}")
return TextContent(type="text", text=f"Unknown tool: {name}")
if __name__ == "__main__":
asyncio.run(server.run())
Integration Workflows¶
Full-Stack Application Analysis¶
# ~/.prompts/full_stack_analysis.txt
//tools file_analyzer.rb,code_quality.rb,config_manager.rb
//mcp github,filesystem,database
# Full-Stack Application Analysis
Application: <%= app_name %>
Repository: <%= repo_url %>
Environment: <%= environment %>
## Phase 1: Repository Analysis
Using GitHub MCP client:
1. Repository health and activity metrics
2. Issue and PR management effectiveness
3. Contributor activity and code review patterns
4. Release and deployment frequency
## Phase 2: Codebase Quality Assessment
Using code analysis tools:
1. Code quality metrics across all languages
2. Complexity analysis and refactoring opportunities
3. Security vulnerability scanning
4. Test coverage and quality assessment
## Phase 3: Configuration Management
Using configuration tools:
1. Configuration file analysis and security
2. Environment-specific settings validation
3. Secret management assessment
4. Deployment configuration review
## Phase 4: Database Architecture
Using database MCP client:
1. Schema design and normalization analysis
2. Index optimization opportunities
3. Query performance analysis
4. Data integrity and quality assessment
## Phase 5: File System Organization
Using filesystem MCP client:
1. Project structure and organization
2. Build and deployment artifacts
3. Documentation completeness
4. Security file analysis
## Integration Report
Cross-analyze findings to provide:
- Overall application health score
- Security risk assessment
- Performance optimization priorities
- Maintenance burden analysis
- Deployment readiness checklist
- Prioritized improvement recommendations
Generate comprehensive analysis with actionable insights for each identified area.
DevOps Pipeline Assessment¶
# ~/.prompts/devops_pipeline_analysis.txt
//tools log_analyzer.rb,config_manager.rb
//mcp github,filesystem
# DevOps Pipeline Analysis
Project: <%= project_name %>
Pipeline type: <%= pipeline_type %>
## CI/CD Configuration Analysis
Using configuration tools:
1. Build configuration validation (GitHub Actions, Jenkins, etc.)
2. Deployment script analysis and security
3. Environment configuration consistency
4. Secret management in CI/CD
## Pipeline Performance Analysis
Using log analysis tools:
1. Build time trends and optimization opportunities
2. Failure rate analysis and common failure patterns
3. Deployment frequency and success rates
4. Resource utilization during builds
## Repository Integration Assessment
Using GitHub MCP:
1. Branch protection rules and policies
2. Automated testing integration
3. Code review automation
4. Release management processes
## Infrastructure as Code Review
Using filesystem MCP:
1. Terraform/CloudFormation template analysis
2. Docker configuration optimization
3. Kubernetes manifest validation
4. Infrastructure security assessment
## Recommendations
Generate prioritized recommendations for:
- Pipeline speed improvements
- Security enhancements
- Reliability improvements
- Cost optimization opportunities
- Automation enhancement suggestions
Provide implementation timeline and impact assessment for each recommendation.
Advanced Integration Patterns¶
Multi-Environment Consistency Checker¶
# ~/.aia/tools/environment_checker.rb
class EnvironmentChecker < RubyLLM::Tool
description "Compares configurations and deployments across multiple environments"
def compare_environments(environments_config)
environments = JSON.parse(environments_config)
comparison_results = {}
environments.each do |env_name, config|
comparison_results[env_name] = analyze_environment(env_name, config)
end
# Cross-environment analysis
consistency_report = analyze_consistency(comparison_results)
drift_analysis = detect_configuration_drift(comparison_results)
{
environments: comparison_results,
consistency: consistency_report,
drift: drift_analysis,
recommendations: generate_consistency_recommendations(consistency_report, drift_analysis)
}.to_json
end
def validate_deployment_readiness(environment, checklist_items = nil)
default_checklist = [
'configuration_files_present',
'secrets_configured',
'database_migrations_applied',
'dependencies_installed',
'health_checks_passing',
'monitoring_configured',
'backup_procedures_verified'
]
checklist = checklist_items || default_checklist
results = {}
checklist.each do |item|
results[item] = check_deployment_item(environment, item)
end
readiness_score = calculate_readiness_score(results)
blocking_issues = identify_blocking_issues(results)
{
environment: environment,
readiness_score: readiness_score,
checklist_results: results,
blocking_issues: blocking_issues,
deployment_recommended: blocking_issues.empty? && readiness_score > 80
}.to_json
end
private
def analyze_environment(env_name, config)
# Analyze single environment
{
name: env_name,
config_files: find_config_files(config['path']),
services: check_services(config['services']),
database: check_database_connection(config['database']),
monitoring: check_monitoring(config['monitoring']),
last_deployment: get_last_deployment_info(env_name)
}
end
def analyze_consistency(environments)
consistency_issues = []
# Compare configuration structures
config_structures = environments.map { |env, data| data[:config_files] }
unless config_structures.all? { |structure| structure.keys.sort == config_structures.first.keys.sort }
consistency_issues << "Configuration file structures differ between environments"
end
# Compare service configurations
service_configs = environments.map { |env, data| data[:services] }
unless service_configs.all? { |config| config.keys.sort == service_configs.first.keys.sort }
consistency_issues << "Service configurations differ between environments"
end
{
consistent: consistency_issues.empty?,
issues: consistency_issues,
score: calculate_consistency_score(consistency_issues)
}
end
end
Related Documentation¶
- Tools Integration - Detailed tool development guide
- MCP Integration - MCP client development and usage
- Advanced Prompting - Complex integration patterns
- Configuration - Tool and MCP configuration
- Examples Directory - Additional examples and templates
These examples demonstrate the power of combining RubyLLM tools with MCP clients to create sophisticated analysis and automation workflows. Use them as templates and inspiration for building your own integrated solutions!