import obspython as obs # type: ignore
import json
import os
import requests
import webbrowser
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
import socket
import time
import psutil # type: ignore
import re
client_id = None
client_secret = None
access_token = None
refresh_token = None
redirect_uri = None
auth_initiated = False
monitor_thread = None
server_thread = None
stop_monitoring = False
current_category = None
last_token_refresh = 0
TOKEN_REFRESH_INTERVAL = 3600 # Changed to 1 hour for better safety margin
discord_games_cache = {}
last_discord_fetch = 0
discord_games_persistent = {} # Persistent storage that accumulates all games ever seen
failed_categories = set() # Track categories that failed to match on Twitch
# Blacklist: Common engine/utility processes that run for multiple games
# These should never be used for category detection
process_blacklist = {
"unitycrashhandler64.exe",
"unitycrashhandler32.exe",
"crashreporter.exe",
"crashhandler.exe",
"unrealcefsubprocess.exe",
"cefprocess.exe",
"steamservice.exe",
"steam.exe",
"epicgameslauncher.exe",
"easyanticheat.exe",
"battleye.exe",
"launcher.exe",
}
# Manual overrides - highest priority (50-90)
process_priorities = {
"cs2.exe": 90,
"leagueclient.exe": 90,
"league of legends.exe": 90,
"cursor.exe": 50,
"code.exe": 50,
"vscode.exe": 50,
"pubg.exe": 90,
"rocketleague.exe": 90,
"amongus.exe": 90,
"Vscodium.exe": 50,
"rainbow6.exe": 90,
"r5apex.exe": 90,
"r5apex_dx12.exe": 90,
"cities.exe": 90,
"cities2.exe": 90,
"fortniteclient-win64-shipping.exe": 90,
"gtav.exe": 90,
"gta5.exe": 90,
"rdr2.exe": 90,
"valorant.exe": 90,
"overwatch.exe": 90,
"dota2.exe": 90,
"minecraft.exe": 90,
"cyberpunk2077.exe": 90,
"shotcut.exe": 60,
"jigsaw.exe": 90,
"huntgame.exe": 90,
"paladins.exe": 90,
"BloonsTD6.exe": 90,
"raft.exe": 90,
}
process_categories = {
"bloonstd6.exe": "Bloons TD 6",
"paladins.exe": "Paladins",
"huntgame.exe": "Hunt: Showdown 1896",
"cs2.exe": "Counter-Strike",
"leagueclient.exe": "League of Legends",
"league of legends.exe": "League of Legends",
"code.exe": "Software and Game Development",
"cursor.exe": "Software and Game Development",
"VsCode.exe": "Software and Game Development",
"vscodium.exe": "Software and Game Development",
"pubg.exe": "PLAYERUNKNOWN'S BATTLEGROUNDS",
"rocketleague.exe": "Rocket League",
"amongus.exe": "Among Us",
"rainbow6.exe": "Tom Clancy's Rainbow Six Siege",
"r5apex.exe": "Apex Legends",
"r5apex_dx12.exe": "Apex Legends",
"cities.exe": "Cities: Skylines",
"cities2.exe": "Cities: Skylines II",
"fortniteclient-win64-shipping.exe": "Fortnite",
"gtav.exe": "Grand Theft Auto V",
"gta5.exe": "Grand Theft Auto V",
"rdr2.exe": "Red Dead Redemption 2",
"valorant.exe": "VALORANT",
"overwatch.exe": "Overwatch 2",
"dota2.exe": "Dota 2",
"minecraft.exe": "Minecraft",
"cyberpunk2077.exe": "Cyberpunk 2077",
"shotcut.exe": "Editor's Hell",
"jigsaw.exe": "Jigsaw Puzzle Dreams",
"raft.exe": "Raft"
}
# Create lowercase lookups once at module level
process_priorities_lower = {k.lower(): v for k, v in process_priorities.items()}
process_categories_lower = {k.lower(): v for k, v in process_categories.items()}
def strip_edition_suffix(game_name):
"""
Remove common edition suffixes from game names to improve Twitch matching.
Examples:
- "Fallout 3: Game of the Year Edition" -> "Fallout 3"
- "STAR WARS™ Battlefront™ II" -> "STAR WARS™ Battlefront™ II" (no change)
- "BioShock™ Remastered" -> "BioShock™"
"""
# List of edition suffixes to remove (case insensitive)
# Patterns support both colon (:) and dash (-) separators
edition_patterns = [
r'[:\-]\s*Game of the Year Edition$',
r'[:\-]\s*GOTY Edition$',
r'[:\-]\s*GOTY$',
r'[:\-]\s*Deluxe Edition$',
r'[:\-]\s*Gold Edition$',
r'[:\-]\s*Ultimate Edition$',
r'[:\-]\s*Complete Edition$',
r'[:\-]\s*Definitive Edition$',
r'[:\-]\s*Enhanced Edition$',
r'[:\-]\s*Special Edition$',
r'[:\-]\s*Collector\'s Edition$',
r'[:\-]\s*Premium Edition$',
r'[:\-]\s*Legendary Edition$',
r'[:\-]\s*Digital Deluxe Edition$',
r'\s+Remastered$',
r'\s+HD$',
r'\s+Directors? Cut$',
r'[:\-]\s*Prepare To Die Edition$',
]
stripped_name = game_name
for pattern in edition_patterns:
stripped_name = re.sub(pattern, '', stripped_name, flags=re.IGNORECASE)
return stripped_name.strip()
def script_description():
return "Automatically updates Twitch category based on running applications. Uses manual database + Discord detectable games as fallback. Discord database updates on each OBS startup."
def script_properties():
props = obs.obs_properties_create()
obs.obs_properties_add_button(props, "login_button", "Login with Twitch", login_button_clicked)
obs.obs_properties_add_button(props, "refresh_token_button", "Refresh Token", refresh_token_button_clicked)
obs.obs_properties_add_button(props, "refresh_discord_button", "Refresh Discord Database", refresh_discord_button_clicked)
return props
def script_load(settings):
global client_id, client_secret, access_token, refresh_token
client_id, client_secret = load_config()
load_access_tokens()
load_persistent_discord_database() # Load accumulated games from disk
fetch_discord_games() # Fetch latest from Discord and merge
if access_token and refresh_token:
if validate_token() or refresh_access_token():
start_process_monitor()
else:
script_log("Failed to validate or refresh tokens. Please re-authenticate using the 'Login with Twitch' button.")
else:
script_log("No valid tokens found. Please authenticate using the 'Login with Twitch' button.")
def script_unload():
global stop_monitoring, monitor_thread, server_thread
stop_monitoring = True
if monitor_thread and monitor_thread.is_alive():
monitor_thread.join(timeout=2)
if server_thread and server_thread.is_alive():
server_thread.join(timeout=2)
def load_config():
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
if os.path.exists(config_path):
try:
with open(config_path, 'r') as f:
config = json.load(f)
return config.get('client_id'), config.get('client_secret')
except (json.JSONDecodeError, IOError) as e:
script_log(f"Error reading config.json: {e}")
else:
script_log("Config file not found. Please create a config.json file with your client_id and client_secret.")
return None, None
def script_log(message):
obs.script_log(obs.LOG_INFO, f"[Twitch Category Updater] {message}")
def login_button_clicked(props, prop):
global auth_initiated
auth_initiated = False
threading.Thread(target=start_oauth_flow, daemon=True).start()
return True
def refresh_token_button_clicked(props, prop):
if refresh_access_token():
script_log("Token refreshed successfully!")
if not monitor_thread or not monitor_thread.is_alive():
start_process_monitor()
else:
script_log("Failed to refresh token. Please re-authenticate.")
return True
def refresh_discord_button_clicked(props, prop):
threading.Thread(target=fetch_discord_games, args=(True,), daemon=True).start()
return True
def should_refresh_token():
global last_token_refresh
time_since_refresh = time.time() - last_token_refresh
return time_since_refresh >= TOKEN_REFRESH_INTERVAL
def start_oauth_flow():
global redirect_uri, server_thread
try:
port = 1111 # Fixed port for OAuth redirect URL
# Check if port is available
try:
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
test_socket.bind(('localhost', port))
test_socket.close()
except OSError:
script_log(f"ERROR: Port {port} is already in use. Please close the application using it or change the port in the script (line 233) and Twitch Developer Console.")
return
redirect_uri = f"http://localhost:{port}"
class OAuthHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass # Suppress HTTP server logs
def do_GET(self):
if "code=" in self.path:
code = self.path.split("code=")[1].split("&")[0]
success = get_access_token(code)
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
if success:
self.wfile.write(b"
Authentication successful!
You can close this window and return to OBS.
")
start_process_monitor()
else:
self.wfile.write(b"Authentication failed!
Please check OBS logs and try again.
")
threading.Thread(target=lambda: time.sleep(2) or self.server.shutdown()).start()
else:
self.send_response(400)
self.end_headers()
self.wfile.write(b"Invalid request
")
server = HTTPServer(('localhost', port), OAuthHandler)
server_thread = threading.Thread(target=server.serve_forever, daemon=True)
server_thread.start()
auth_url = f"https://id.twitch.tv/oauth2/authorize?client_id={client_id}&redirect_uri={redirect_uri}&response_type=code&scope=channel:manage:broadcast"
webbrowser.open(auth_url)
script_log(f"OAuth server started on port {port}. Opening browser for authentication...")
except Exception as e:
script_log(f"Failed to start OAuth flow: {e}")
def get_access_token(code):
global access_token, refresh_token, last_token_refresh
token_url = "https://id.twitch.tv/oauth2/token"
data = {
"client_id": client_id,
"client_secret": client_secret,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": redirect_uri
}
try:
response = requests.post(token_url, data=data, timeout=30)
response.raise_for_status()
token_data = response.json()
access_token = token_data["access_token"]
refresh_token = token_data["refresh_token"]
last_token_refresh = time.time()
save_access_tokens(access_token, refresh_token)
script_log("Successfully logged in!")
return True
except requests.exceptions.RequestException as e:
script_log(f"Failed to obtain access token: {e}")
return False
def save_access_tokens(access_token, refresh_token):
token_path = os.path.join(os.path.dirname(__file__), 'tokens.json')
try:
with open(token_path, 'w') as f:
json.dump({
"access_token": access_token,
"refresh_token": refresh_token,
"timestamp": time.time()
}, f)
except IOError as e:
script_log(f"Failed to save tokens: {e}")
def refresh_access_token():
global access_token, refresh_token, last_token_refresh
if not refresh_token:
script_log("No refresh token available. Please re-authenticate.")
return False
token_url = "https://id.twitch.tv/oauth2/token"
data = {
"client_id": client_id,
"client_secret": client_secret,
"grant_type": "refresh_token",
"refresh_token": refresh_token
}
try:
response = requests.post(token_url, data=data, timeout=30)
response.raise_for_status()
token_data = response.json()
access_token = token_data["access_token"]
if "refresh_token" in token_data:
refresh_token = token_data["refresh_token"]
last_token_refresh = time.time()
save_access_tokens(access_token, refresh_token)
script_log("Successfully refreshed access token!")
return True
except requests.exceptions.RequestException as e:
script_log(f"Failed to refresh access token: {e}")
access_token = None
refresh_token = None
return False
def load_access_tokens():
global access_token, refresh_token, last_token_refresh
token_path = os.path.join(os.path.dirname(__file__), 'tokens.json')
if os.path.exists(token_path):
try:
with open(token_path, 'r') as f:
data = json.load(f)
access_token = data.get("access_token")
refresh_token = data.get("refresh_token")
last_token_refresh = data.get("timestamp", 0)
except (json.JSONDecodeError, IOError) as e:
script_log(f"Error loading tokens: {e}")
def fetch_discord_games(force_refresh=False):
global discord_games_cache, discord_games_persistent, last_discord_fetch
# Only skip if we already fetched during this session (unless forced)
if not force_refresh and last_discord_fetch > 0:
return
try:
script_log("Fetching Discord detectable games database...")
response = requests.get(
"https://discord.com/api/v9/applications/detectable",
timeout=15
)
response.raise_for_status()
games_data = response.json()
# Build current cache: executable -> game name
new_cache = {}
new_games_count = 0
for game in games_data:
if 'executables' in game and 'name' in game:
for exe in game['executables']:
if 'name' in exe:
# Extract just the filename, not the path
exe_full = exe['name'].lower()
# Handle both "game.exe" and "folder/game.exe" formats
exe_name = exe_full.split('/')[-1] if '/' in exe_full else exe_full
# Skip blacklisted utility processes
if exe_name in process_blacklist:
continue
game_name = game['name']
new_cache[exe_name] = game_name
# Add to persistent database if not already there
if exe_name not in discord_games_persistent:
discord_games_persistent[exe_name] = game_name
new_games_count += 1
discord_games_cache = new_cache
last_discord_fetch = time.time()
if new_games_count > 0:
script_log(f"Discord database updated: {len(discord_games_cache)} current games, {new_games_count} new games added to persistent database")
save_persistent_discord_database()
else:
script_log(f"Discord database refreshed: {len(discord_games_cache)} current games, {len(discord_games_persistent)} total accumulated")
except requests.exceptions.RequestException as e:
script_log(f"Failed to fetch Discord games: {e}")
# Use persistent database as fallback
if discord_games_persistent:
discord_games_cache = discord_games_persistent.copy()
script_log(f"Using persistent database: {len(discord_games_cache)} games")
def load_persistent_discord_database():
"""Load the accumulated Discord games database from disk"""
global discord_games_persistent
db_path = os.path.join(os.path.dirname(__file__), 'discord_games_persistent.json')
if os.path.exists(db_path):
try:
with open(db_path, 'r', encoding='utf-8') as f:
data = json.load(f)
discord_games_persistent = data.get('games', {})
timestamp = data.get('timestamp', 0)
script_log(f"Loaded persistent Discord database: {len(discord_games_persistent)} games (last updated: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))})")
except (json.JSONDecodeError, IOError) as e:
script_log(f"Error loading persistent Discord database: {e}")
discord_games_persistent = {}
else:
script_log("No persistent Discord database found, will create new one")
discord_games_persistent = {}
def save_persistent_discord_database():
"""Save the accumulated Discord games database to disk"""
db_path = os.path.join(os.path.dirname(__file__), 'discord_games_persistent.json')
try:
with open(db_path, 'w', encoding='utf-8') as f:
json.dump({
'games': discord_games_persistent,
'timestamp': time.time(),
'total_games': len(discord_games_persistent)
}, f, indent=2, ensure_ascii=False)
script_log(f"Saved persistent Discord database: {len(discord_games_persistent)} total games")
except IOError as e:
script_log(f"Failed to save persistent Discord database: {e}")
def update_twitch_category(category):
global current_category, failed_categories
if category == current_category:
return True
# Check if category or its stripped version already failed
stripped_category = strip_edition_suffix(category)
if category in failed_categories and stripped_category in failed_categories:
return False
if not access_token:
script_log("Not authenticated with Twitch")
return False
headers = {
'Client-ID': client_id,
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
try:
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
with open(config_path, 'r') as f:
config = json.load(f)
broadcaster_name = config.get('broadcaster_name')
if not broadcaster_name:
script_log("broadcaster_name not found in config.json")
return False
except (IOError, json.JSONDecodeError) as e:
script_log(f"Error reading broadcaster name from config: {e}")
return False
try:
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
try:
user_response = requests.get(
f'https://api.twitch.tv/helix/users?login={broadcaster_name}',
headers=headers,
timeout=15
)
if user_response.status_code == 401:
script_log("Token invalid during user lookup, attempting refresh...")
if not refresh_access_token():
return False
headers['Authorization'] = f'Bearer {access_token}'
continue
user_response.raise_for_status()
break
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
script_log(f"Failed to get user ID after {max_retries} attempts: {e}")
return False
time.sleep(retry_delay)
user_data = user_response.json().get('data', [])
if not user_data:
script_log(f"User not found: {broadcaster_name}")
return False
broadcaster_id = user_data[0]['id']
# First try: exact match with full name
category_response = requests.get(
f'https://api.twitch.tv/helix/search/categories?query={requests.utils.quote(category)}',
headers=headers,
timeout=15
)
category_response.raise_for_status()
categories = category_response.json().get('data', [])
# Try exact match first
exact_match = next((cat for cat in categories if cat['name'].lower() == category.lower()), None)
matched_using_stripped = False
# If no exact match found, try with stripped edition suffix
if not exact_match:
stripped_category = strip_edition_suffix(category)
# Only retry if the stripped name is different
if stripped_category.lower() != category.lower():
script_log(f"No exact match for '{category}', trying stripped version: '{stripped_category}'")
category_response = requests.get(
f'https://api.twitch.tv/helix/search/categories?query={requests.utils.quote(stripped_category)}',
headers=headers,
timeout=15
)
category_response.raise_for_status()
categories = category_response.json().get('data', [])
exact_match = next((cat for cat in categories if cat['name'].lower() == stripped_category.lower()), None)
if exact_match:
matched_using_stripped = True
# Remove from failed cache if we found it with stripping
failed_categories.discard(category)
if not exact_match:
script_log(f"No match found on Twitch for '{category}' (tried stripped version too). Skipping update.")
failed_categories.add(category)
if stripped_category.lower() != category.lower():
failed_categories.add(stripped_category)
return False
game_id = exact_match['id']
actual_category = exact_match['name']
if matched_using_stripped:
script_log(f"Matched '{category}' using stripped name -> '{actual_category}'")
update_response = requests.patch(
f'https://api.twitch.tv/helix/channels?broadcaster_id={broadcaster_id}',
headers=headers,
json={'game_id': game_id},
timeout=15
)
if update_response.status_code == 401:
script_log("Token invalid during channel update, attempting refresh...")
if refresh_access_token():
headers['Authorization'] = f'Bearer {access_token}'
update_response = requests.patch(
f'https://api.twitch.tv/helix/channels?broadcaster_id={broadcaster_id}',
headers=headers,
json={'game_id': game_id},
timeout=15
)
update_response.raise_for_status()
current_category = category
script_log(f"Successfully updated category to: {actual_category}")
return True
except requests.exceptions.RequestException as e:
script_log(f"Failed to update category: {str(e)}")
return False
def check_processes():
try:
highest_priority = -1
selected_category = None
for proc in psutil.process_iter(['name']):
try:
process_name = proc.info['name'].lower()
# Skip blacklisted utility/engine processes
if process_name in process_blacklist:
continue
# Check manual database first (priority 50-90)
if process_name in process_categories_lower:
priority = process_priorities_lower.get(process_name, 50)
if priority > highest_priority:
highest_priority = priority
selected_category = process_categories_lower[process_name]
# Check persistent Discord database as fallback (priority 30)
# This database continuously grows and never loses games
elif process_name in discord_games_persistent:
priority = 30 # Lower than all manual entries
if priority > highest_priority:
highest_priority = priority
selected_category = discord_games_persistent[process_name]
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return selected_category if selected_category is not None else "Just Chatting"
except Exception as e:
script_log(f"Error checking processes: {str(e)}")
return "Just Chatting"
def validate_token():
if not access_token:
return False
headers = {
'Authorization': f'OAuth {access_token}'
}
try:
response = requests.get('https://id.twitch.tv/oauth2/validate', headers=headers, timeout=10)
if response.status_code == 200:
return True
elif response.status_code == 401:
return False
else:
script_log(f"Unexpected status code during token validation: {response.status_code}")
return False
except requests.exceptions.RequestException as e:
script_log(f"Error during token validation: {str(e)}")
return False
def start_process_monitor():
global monitor_thread, stop_monitoring
stop_monitoring = False
def monitor():
consecutive_failures = 0
max_consecutive_failures = 5
while not stop_monitoring:
try:
if not client_id or not client_secret:
script_log("Missing client credentials. Please check config.json")
time.sleep(300)
continue
if should_refresh_token():
script_log("1 hour elapsed, refreshing token automatically...")
if not refresh_access_token():
script_log("Automatic token refresh failed")
consecutive_failures += 1
else:
consecutive_failures = 0
if consecutive_failures < max_consecutive_failures:
if not validate_token():
script_log("Token validation failed, attempting refresh...")
if not refresh_access_token():
consecutive_failures += 1
script_log(f"Token refresh failed. Consecutive failures: {consecutive_failures}")
time.sleep(min(60 * (2 ** consecutive_failures), 3600))
continue
else:
consecutive_failures = 0
else:
script_log("Too many consecutive failures. Waiting 1 hour before retry...")
time.sleep(3600)
consecutive_failures = 0
continue
category = check_processes()
if update_twitch_category(category):
consecutive_failures = 0
else:
consecutive_failures += 1
time.sleep(60)
except Exception as e:
script_log(f"Error in monitor thread: {str(e)}")
consecutive_failures += 1
time.sleep(60)
if monitor_thread and monitor_thread.is_alive():
stop_monitoring = True
monitor_thread.join(timeout=2)
monitor_thread = threading.Thread(target=monitor, daemon=True)
monitor_thread.start()
script_log("Process monitor started")