import sys
import re
import os
import subprocess
import shutil
GAME_CANDIDATES = {
0: {"name": "Wallpaper Engine", "id": "431960", "login_info": "login a b"},
1: {"name": "Stellaris", "id": "281990", "login_info": "login a b"},
2: {"name": "RimWorld", "id": "294100", "login_info": "login a b"},
3: {"name": "Hearts of Iron IV", "id": "394360", "login_info": "login a b"},
}
DEFAULT_INPUT_FILENAME = "input.txt"
DEFAULT_OUTPUT_FILENAME = "workshop_script.txt"
LOG_FILENAME = "steamcmd_output.log"
WORKSHOP_BASE_PATH = "steamapps/workshop/content"
STEAMAPPS_WORKSHOP_DIR = "steamapps/workshop"
def display_initial_instructions():
"""Prints the JavaScript snippets for fetching workshop IDs."""
print("--- How to Fetch Workshop IDs ---")
print("Run these commands in your browser's developer console on the Steam Workshop page.")
print("\nFor a collection:")
print("[...document.querySelectorAll('div.collectionItemDetails > a')].map(item => item.href.split('=')[1])")
print("\nFor your subscribed items list:")
print("[...document.querySelectorAll('div.workshopItemSubscriptionDetails > a')].map(item => item.href.split('=')[1])")
print("-" * 40)
def display_welcome():
"""Prints the main welcome message and lists available games."""
print("\n--- Steam Workshop Interactive Downloader & Verifier ---")
print("This script will format IDs, run SteamCMD, and verify the downloads.")
print("\nAvailable Games:")
for key, game_info in GAME_CANDIDATES.items():
print(f" [{key}] -> {game_info['name']} (ID: {game_info['id']})")
print("-" * 40)
def parse_universal_input_file(filename):
"""
Intelligently parses an input file, detecting if it's a steamcmd script
or a JSON-like array of IDs.
Returns (app_id, item_ids_set)
"""
try:
with open(filename, 'r') as f:
content = f.read()
except FileNotFoundError:
print(f"[ERROR] Input file '{filename}' not found.")
return None, None
script_pattern = re.compile(r"workshop_download_item\s+(\d+)\s+(\d+)")
json_pattern = re.compile(r'"(\d+)"')
script_matches = script_pattern.findall(content)
if script_matches:
print(f"Detected 'steamcmd script' format in '{filename}'.")
app_id = script_matches[0][0]
if not all(match[0] == app_id for match in script_matches):
print("[ERROR] Inconsistent App IDs found in the script file. Aborting.")
return None, None
item_ids = {match[1] for match in script_matches}
return app_id, item_ids
json_matches = json_pattern.findall(content)
if json_matches:
print(f"Detected 'JSON array' format in '{filename}'.")
item_ids = set(json_matches)
return None, item_ids
print(f"[ERROR] Could not find any valid item IDs in '{filename}'.")
return None, None
def execute_formatting(item_ids, chosen_game_id, output_filename):
"""Writes the formatted item IDs to the output file."""
print("\n--- Writing Script File ---")
formatted_lines = [f"workshop_download_item {chosen_game_id} {item_id}\n" for item_id in sorted(list(item_ids))]
with open(output_filename, 'w') as f:
f.writelines(formatted_lines)
print(f"Successfully wrote {len(item_ids)} items to '{output_filename}'.")
def perform_cleanup(app_id):
"""Removes specific workshop directories and the app's acf file before downloading."""
print("\n--- Performing Pre-run Cleanup ---")
dirs_to_remove = [
os.path.join(STEAMAPPS_WORKSHOP_DIR, 'downloads'),
os.path.join(STEAMAPPS_WORKSHOP_DIR, 'content'),
os.path.join(STEAMAPPS_WORKSHOP_DIR, 'temp')
]
acf_file_to_remove = os.path.join(STEAMAPPS_WORKSHOP_DIR, f'appworkshop_{app_id}.acf')
for dir_path in dirs_to_remove:
if os.path.isdir(dir_path):
try:
shutil.rmtree(dir_path)
print(f" Successfully removed directory: {dir_path}")
except Exception as e:
print(f" [ERROR] Failed to remove directory {dir_path}: {e}")
else:
print(f" Directory not found, skipping: {dir_path}")
if os.path.isfile(acf_file_to_remove):
try:
os.remove(acf_file_to_remove)
print(f" Successfully removed file: {acf_file_to_remove}")
except Exception as e:
print(f" [ERROR] Failed to remove file {acf_file_to_remove}: {e}")
else:
print(f" File not found, skipping: {acf_file_to_remove}")
def run_steamcmd(login_info, script_filename):
"""Constructs and runs steamcmd, logging all output to a file in real-time."""
steamcmd_exec = "steamcmd.sh" if sys.platform != "win32" else "steamcmd.exe"
if not os.path.exists(steamcmd_exec):
print(f"\n[ERROR] '{steamcmd_exec}' not found. Please run this script from your steamcmd directory.")
return False
command = [f"./{steamcmd_exec}"]
command.append(f"+{login_info}")
command.extend([f"+runscript", script_filename, "+quit"])
print("\n--- Running SteamCMD ---")
print(f"Executing command: {' '.join(command)}")
print(f"Logging output to '{LOG_FILENAME}' in real-time. This may take a while.")
print("Progress: ", end='', flush=True)
try:
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True)
with open(LOG_FILENAME, 'w') as log_file:
for line in iter(process.stdout.readline, ''):
log_file.write(line)
print(".", end='', flush=True)
process.stdout.close()
return_code = process.wait()
print("\n" + "-" * 40)
if return_code == 0:
print(f"[SUCCESS] SteamCMD has finished. Log saved to '{LOG_FILENAME}'.")
return True
else:
print(f"[WARNING] SteamCMD finished with exit code {return_code}. Check '{LOG_FILENAME}' for errors.")
return True
except Exception as e:
print(f"\n[ERROR] An error occurred while running SteamCMD: {e}")
return False
def perform_verification(app_id, expected_ids):
"""Compares expected IDs with downloaded directories and reports missing ones."""
print("\n--- Verifying Downloads ---")
content_path_for_app = os.path.join(WORKSHOP_BASE_PATH, app_id)
print(f"Checking for directories in '{content_path_for_app}'...")
if not os.path.isdir(content_path_for_app):
print(" Warning: Content directory does not exist. All items are considered missing.")
existing_dirs = set()
else:
existing_dirs = {entry for entry in os.listdir(content_path_for_app) if entry.isdigit()}
print(f" Expected {len(expected_ids)} items, found {len(existing_dirs)} downloaded directories.")
missing_ids = expected_ids.difference(existing_dirs)
print("\n--- Verification Complete ---")
if not missing_ids:
print(f"✅ SUCCESS: All {len(expected_ids)} expected items are present!")
else:
print(f"❌ MISSING ITEMS: Found {len(missing_ids)} missing item director(y/ies).")
print("---------------------------")
for item_id in sorted(list(missing_ids)):
print(f" - {item_id}")
print("---------------------------")
print("Suggestion: Copy the missing IDs into a new input file and run this script again.")
def main():
display_initial_instructions()
display_welcome()
input_filename = input(f"Enter input filename (default: '{DEFAULT_INPUT_FILENAME}'): ") or DEFAULT_INPUT_FILENAME
detected_app_id, all_item_ids = parse_universal_input_file(input_filename)
if all_item_ids is None:
sys.exit(1)
chosen_game_info = None
if detected_app_id:
game_name = "Unknown Game"
for game in GAME_CANDIDATES.values():
if game['id'] == detected_app_id:
game_name = game['name']
chosen_game_info = game
break
confirm = input(f"Detected App ID {detected_app_id} ({game_name}). Use this game? (y/n): ").lower()
if confirm.strip() != 'y':
chosen_game_info = None
if not chosen_game_info:
chosen_indicator = -1
while chosen_indicator not in GAME_CANDIDATES:
try:
user_input = input("\nEnter the number for the game you want to use: ")
chosen_indicator = int(user_input)
chosen_game_info = GAME_CANDIDATES[chosen_indicator]
except (ValueError, KeyError):
print("[ERROR] Invalid selection. Please enter a valid number.")
output_filename = input(f"Enter output script name (default: '{DEFAULT_OUTPUT_FILENAME}'): ") or DEFAULT_OUTPUT_FILENAME
execute_formatting(all_item_ids, chosen_game_info['id'], output_filename)
perform_cleanup(chosen_game_info['id'])
if run_steamcmd(chosen_game_info['login_info'], output_filename):
perform_verification(chosen_game_info['id'], all_item_ids)
if __name__ == "__main__":
main()