import json
import os
from datetime import datetime
from typing import Dict, Optional
from pr.core.logging import get_logger
logger = get_logger('usage')
USAGE_DB_FILE = os.path.expanduser("~/.assistant_usage.json")
MODEL_COSTS = {
'x-ai/grok-code-fast-1': {'input': 0.0, 'output': 0.0},
'gpt-4': {'input': 0.03, 'output': 0.06},
'gpt-4-turbo': {'input': 0.01, 'output': 0.03},
'gpt-3.5-turbo': {'input': 0.0005, 'output': 0.0015},
'claude-3-opus': {'input': 0.015, 'output': 0.075},
'claude-3-sonnet': {'input': 0.003, 'output': 0.015},
'claude-3-haiku': {'input': 0.00025, 'output': 0.00125},
}
class UsageTracker:
def __init__(self):
self.session_usage = {
'requests': 0,
'total_tokens': 0,
'input_tokens': 0,
'output_tokens': 0,
'estimated_cost': 0.0,
'models_used': {}
}
def track_request(
self,
model: str,
input_tokens: int,
output_tokens: int,
total_tokens: Optional[int] = None
):
if total_tokens is None:
total_tokens = input_tokens + output_tokens
self.session_usage['requests'] += 1
self.session_usage['total_tokens'] += total_tokens
self.session_usage['input_tokens'] += input_tokens
self.session_usage['output_tokens'] += output_tokens
if model not in self.session_usage['models_used']:
self.session_usage['models_used'][model] = {
'requests': 0,
'tokens': 0,
'cost': 0.0
}
model_usage = self.session_usage['models_used'][model]
model_usage['requests'] += 1
model_usage['tokens'] += total_tokens
cost = self._calculate_cost(model, input_tokens, output_tokens)
model_usage['cost'] += cost
self.session_usage['estimated_cost'] += cost
self._save_to_history(model, input_tokens, output_tokens, cost)
logger.debug(
f"Tracked request: {model}, tokens: {total_tokens}, cost: ${cost:.4f}"
)
def _calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
if model not in MODEL_COSTS:
base_model = model.split('/')[0] if '/' in model else model
if base_model not in MODEL_COSTS:
logger.warning(f"Unknown model for cost calculation: {model}")
return 0.0
costs = MODEL_COSTS[base_model]
else:
costs = MODEL_COSTS[model]
input_cost = (input_tokens / 1000) * costs['input']
output_cost = (output_tokens / 1000) * costs['output']
return input_cost + output_cost
def _save_to_history(self, model: str, input_tokens: int, output_tokens: int, cost: float):
try:
history = []
if os.path.exists(USAGE_DB_FILE):
with open(USAGE_DB_FILE, 'r') as f:
history = json.load(f)
history.append({
'timestamp': datetime.now().isoformat(),
'model': model,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'total_tokens': input_tokens + output_tokens,
'cost': cost
})
if len(history) > 10000:
history = history[-10000:]
with open(USAGE_DB_FILE, 'w') as f:
json.dump(history, f, indent=2)
except Exception as e:
logger.error(f"Error saving usage history: {e}")
def get_session_summary(self) -> Dict:
return self.session_usage.copy()
def get_formatted_summary(self) -> str:
usage = self.session_usage
lines = [
"\n=== Session Usage Summary ===",
f"Total Requests: {usage['requests']}",
f"Total Tokens: {usage['total_tokens']:,}",
f" Input: {usage['input_tokens']:,}",
f" Output: {usage['output_tokens']:,}",
f"Estimated Cost: ${usage['estimated_cost']:.4f}",
]
if usage['models_used']:
lines.append("\nModels Used:")
for model, stats in usage['models_used'].items():
lines.append(
f" {model}: {stats['requests']} requests, "
f"{stats['tokens']:,} tokens, ${stats['cost']:.4f}"
)
return '\n'.join(lines)
@staticmethod
def get_total_usage() -> Dict:
if not os.path.exists(USAGE_DB_FILE):
return {
'total_requests': 0,
'total_tokens': 0,
'total_cost': 0.0
}
try:
with open(USAGE_DB_FILE, 'r') as f:
history = json.load(f)
total_tokens = sum(entry['total_tokens'] for entry in history)
total_cost = sum(entry['cost'] for entry in history)
return {
'total_requests': len(history),
'total_tokens': total_tokens,
'total_cost': total_cost
}
except Exception as e:
logger.error(f"Error loading usage history: {e}")
return {
'total_requests': 0,
'total_tokens': 0,
'total_cost': 0.0
}