196 lines
8.2 KiB
Python
196 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
||
import os, time, random, requests, logging, json
|
||
|
||
DEBUG = os.getenv("DEBUG", "true").lower() == "true"
|
||
VERIFY_SSL = os.getenv("VERIFY_SSL", "false").lower() == "true"
|
||
|
||
ADMIN_API_HOST = os.getenv("ADMIN_API_HOST", "http://localhost:8445")
|
||
CLIENT_API_HOST = os.getenv("CLIENT_API_HOST", "http://localhost:8080")
|
||
ADMIN_EMAIL = os.getenv("ADMIN_EMAIL", "superadmin@eventhub.local")
|
||
ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "123456")
|
||
BOT_PASSWORD = os.getenv("BOT_PASSWORD", "botpass123")
|
||
MIN_DELAY = float(os.getenv("MIN_DELAY", "0.5"))
|
||
MAX_DELAY = float(os.getenv("MAX_DELAY", "3.0"))
|
||
LOOP_FOREVER = os.getenv("LOOP_FOREVER", "true").lower() == "true"
|
||
BOT_REFRESH_INTERVAL = int(os.getenv("BOT_REFRESH_INTERVAL", "300"))
|
||
|
||
if not DEBUG:
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
|
||
else:
|
||
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)s] %(message)s')
|
||
logger = logging.getLogger("emulator")
|
||
|
||
bots_cache = []
|
||
admin_token = None
|
||
last_bot_refresh = 0
|
||
|
||
def log_request(method, url, headers=None, json_data=None):
|
||
if not DEBUG:
|
||
return
|
||
logger.debug(f"--> {method} {url}")
|
||
if headers:
|
||
# Не выводим полный Authorization, чтобы не светить токен
|
||
safe_headers = {k: v if k != "Authorization" else v[:20] + "..." for k, v in headers.items()}
|
||
logger.debug(f" Headers: {safe_headers}")
|
||
if json_data:
|
||
logger.debug(f" Body: {json.dumps(json_data)}")
|
||
|
||
def log_response(resp):
|
||
if not DEBUG:
|
||
return
|
||
logger.debug(f"<-- {resp.status_code} {resp.url}")
|
||
try:
|
||
body = resp.json()
|
||
body_str = json.dumps(body, indent=2)
|
||
except:
|
||
body_str = resp.text[:200]
|
||
logger.debug(f" Response: {body_str}")
|
||
if resp.status_code not in (200, 201):
|
||
logger.warning(f" Unexpected status {resp.status_code}: {body_str}")
|
||
|
||
def request(method, url, **kwargs):
|
||
if not VERIFY_SSL:
|
||
kwargs["verify"] = False
|
||
log_request(method, url, headers=kwargs.get("headers"), json_data=kwargs.get("json"))
|
||
resp = requests.request(method, url, **kwargs)
|
||
log_response(resp)
|
||
return resp
|
||
|
||
def get_admin_token():
|
||
global admin_token
|
||
if admin_token:
|
||
return admin_token
|
||
resp = request("POST",
|
||
f"{ADMIN_API_HOST}/v1/admin/login",
|
||
json={"email": ADMIN_EMAIL, "password": ADMIN_PASSWORD},
|
||
headers={"Content-Type": "application/json"}
|
||
)
|
||
resp.raise_for_status()
|
||
admin_token = resp.json()["token"]
|
||
logger.info("Admin token obtained")
|
||
return admin_token
|
||
|
||
def fetch_bot_emails():
|
||
token = get_admin_token()
|
||
resp = request("GET",
|
||
f"{ADMIN_API_HOST}/v1/admin/users?limit=10000",
|
||
headers={"Authorization": f"Bearer {token}"}
|
||
)
|
||
resp.raise_for_status()
|
||
users = resp.json()
|
||
emails = [u["email"] for u in users if u.get("role") == "bot"]
|
||
logger.info(f"Fetched {len(emails)} bot emails (total users: {len(users)})")
|
||
return emails
|
||
|
||
def login_bot(email):
|
||
resp = request("POST",
|
||
f"{CLIENT_API_HOST}/v1/login",
|
||
json={"email": email, "password": BOT_PASSWORD},
|
||
headers={"Content-Type": "application/json"}
|
||
)
|
||
resp.raise_for_status()
|
||
return resp.json()["token"]
|
||
|
||
def refresh_bot_cache():
|
||
global bots_cache, last_bot_refresh
|
||
emails = fetch_bot_emails()
|
||
new_cache = []
|
||
for email in emails:
|
||
try:
|
||
token = login_bot(email)
|
||
new_cache.append({"email": email, "token": token})
|
||
except Exception as e:
|
||
logger.warning(f"Could not login bot {email}: {e}")
|
||
bots_cache = new_cache
|
||
last_bot_refresh = time.time()
|
||
logger.info(f"Bot cache refreshed, {len(bots_cache)} bots ready")
|
||
|
||
def random_bot():
|
||
global bots_cache, last_bot_refresh
|
||
while True:
|
||
if not bots_cache or (time.time() - last_bot_refresh > BOT_REFRESH_INTERVAL):
|
||
refresh_bot_cache()
|
||
if bots_cache:
|
||
return random.choice(bots_cache)
|
||
logger.warning("No bots available, retrying in 10 seconds...")
|
||
time.sleep(10)
|
||
|
||
def random_sleep():
|
||
time.sleep(random.uniform(MIN_DELAY, MAX_DELAY))
|
||
|
||
def do_random_action(bot):
|
||
action = random.randint(1, 14)
|
||
headers = {"Authorization": f"Bearer {bot['token']}", "Content-Type": "application/json"}
|
||
base = CLIENT_API_HOST
|
||
try:
|
||
if action == 1:
|
||
resp = request("POST", f"{base}/v1/calendars", json={"title": f"Cal-{random.randint(1,1000)}", "confirmation": "auto"}, headers=headers)
|
||
if resp.status_code == 201:
|
||
logger.debug(f"Bot {bot['email']} created calendar {resp.json()['id']}")
|
||
elif action == 2:
|
||
request("GET", f"{base}/v1/calendars", headers=headers)
|
||
elif action == 3:
|
||
resp_cal = request("GET", f"{base}/v1/calendars", headers=headers)
|
||
if resp_cal.status_code == 200 and resp_cal.json():
|
||
cal = random.choice(resp_cal.json())
|
||
request("POST", f"{base}/v1/calendars/{cal['id']}/events",
|
||
json={"title": f"Event-{random.randint(1,1000)}", "start_time": "2027-01-01T10:00:00Z", "duration": 60},
|
||
headers=headers)
|
||
elif action == 4:
|
||
request("GET", f"{base}/v1/search?q=test&limit=5", headers=headers)
|
||
elif action == 5:
|
||
resp_ev = request("GET", f"{base}/v1/search?type=event&limit=20", headers=headers)
|
||
if resp_ev.status_code == 200 and resp_ev.json().get("results"):
|
||
events = resp_ev.json()["results"].get("events", [])
|
||
if events:
|
||
ev = random.choice(events)
|
||
request("POST", f"{base}/v1/events/{ev['id']}/bookings", json={}, headers=headers)
|
||
elif action == 6:
|
||
resp_book = request("GET", f"{base}/v1/user/bookings", headers=headers)
|
||
if resp_book.status_code == 200 and resp_book.json():
|
||
booking = random.choice(resp_book.json())
|
||
request("POST", f"{base}/v1/reviews",
|
||
json={"target_type": "event", "target_id": booking["event_id"], "rating": random.randint(1,5), "comment": "Nice!"},
|
||
headers=headers)
|
||
elif action == 7:
|
||
resp_ev = request("GET", f"{base}/v1/search?type=event&limit=20", headers=headers)
|
||
if resp_ev.status_code == 200 and resp_ev.json().get("results"):
|
||
events = resp_ev.json()["results"].get("events", [])
|
||
if events:
|
||
ev = random.choice(events)
|
||
request("POST", f"{base}/v1/reports",
|
||
json={"target_type": "event", "target_id": ev["id"], "reason": "Test"},
|
||
headers=headers)
|
||
elif action == 8:
|
||
request("POST", f"{base}/v1/tickets",
|
||
json={"error_message": "Emulated error", "stacktrace": "line 1"},
|
||
headers=headers)
|
||
elif action == 9:
|
||
request("POST", f"{base}/v1/subscription", json={"action": "start_trial"}, headers=headers)
|
||
elif action == 10:
|
||
request("GET", f"{base}/v1/user/me", headers=headers)
|
||
elif action == 11:
|
||
request("GET", f"{base}/v1/user/bookings", headers=headers)
|
||
elif action == 12:
|
||
request("GET", f"{base}/v1/user/reviews", headers=headers)
|
||
elif action == 13:
|
||
resp_cal = request("GET", f"{base}/v1/calendars", headers=headers)
|
||
if resp_cal.status_code == 200 and resp_cal.json():
|
||
cal = random.choice(resp_cal.json())
|
||
request("GET", f"{base}/v1/calendars/{cal['id']}/view?month=2026-06", headers=headers)
|
||
elif action == 14:
|
||
request("POST", f"{base}/v1/refresh", json={"refresh_token": "dummy"}, headers=headers)
|
||
except Exception as e:
|
||
logger.error(f"Action {action} failed for {bot['email']}: {e}")
|
||
|
||
def main():
|
||
logger.info("Starting user emulation")
|
||
refresh_bot_cache()
|
||
while LOOP_FOREVER:
|
||
bot = random_bot()
|
||
do_random_action(bot)
|
||
random_sleep()
|
||
logger.info("Emulation finished")
|
||
|
||
if __name__ == "__main__":
|
||
main() |