#!/usr/bin/env python3 import requests import xml.etree.ElementTree as ET import html import json import re from pathlib import Path import logging from datetime import datetime # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) FEED_URL = "https://aastatus.net/atom.cgi" WEBHOOK_URL = "https://discord.com/api/webhooks/XXXXXXX" # Discord webhook URL STATE_FILE = Path("/tmp/aaisp_atom_state.json") # Event colors COLOR_MINOR_OPEN = 0xFFFF00 # yellow COLOR_MAJOR_OPEN = 0xFF0000 # red COLOR_CLOSED = 0x2ECC71 # green COLOR_UNKNOWN = 0x95A5A6 # grey ### STATE MANAGEMENT ### def load_state(): if not STATE_FILE.exists(): return {} try: return json.loads(STATE_FILE.read_text()) except Exception: logger.error("State file corrupted, resetting...") return {} def save_state(state): try: STATE_FILE.write_text(json.dumps(state, indent=2)) except Exception as e: logger.error(f"State save failed: {e}") ### FEED PARSING ### def fetch_feed(url): try: resp = requests.get(url, timeout=10) resp.raise_for_status() return resp.text except requests.RequestException as e: logger.error(f"Error fetching feed: {e}") raise def parse_feed_entries(feed_xml): """Return a list of all valid entries in the feed, sorted oldest first.""" entries = [] try: root = ET.fromstring(feed_xml) atom_ns = "{http://www.w3.org/2005/Atom}" for entry in root.findall(f"{atom_ns}entry"): id_elem = entry.find(f"{atom_ns}id") if id_elem is None: continue title_elem = entry.find(f"{atom_ns}title") title_text = title_elem.text.strip() if title_elem is not None and title_elem.text else "No Title" content_elem = entry.find(f"{atom_ns}content") summary_elem = entry.find(f"{atom_ns}summary") if content_elem is not None and content_elem.text and content_elem.text.strip(): content_text = content_elem.text.strip() elif summary_elem is not None and summary_elem.text and summary_elem.text.strip(): content_text = summary_elem.text.strip() else: content_text = title_text # Link link_elem = entry.find(f"{atom_ns}link[@rel='alternate']") if link_elem is not None: link = link_elem.get("href", "") else: first_link = entry.find(f"{atom_ns}link") link = first_link.get("href", "") if first_link is not None else "" # Categories (severity, type, status) severity = "Unknown" categories = [] status = "Unknown" for cat in entry.findall(f".//{atom_ns}category"): label = cat.get("label", "") or cat.get("term", "") scheme = cat.get("scheme", "") if scheme == "https://aastatus.net/severity": severity = label elif scheme == "https://aastatus.net/type": categories.append(label) elif scheme == "https://aastatus.net/status": status = label updated = entry.findtext(f"{atom_ns}updated", "") published = entry.findtext(f"{atom_ns}published", "") entries.append({ "id": id_elem.text.strip(), "title": html.unescape(title_text), "link": link, "updated": updated, "published": published, "categories": ",".join(categories), "severity": severity, "status": status, "content": html.unescape(content_text) }) # Sort entries by published timestamp (oldest first) def parse_date(entry): try: return datetime.fromisoformat(entry["published"].replace("Z", "+00:00")) except Exception: return datetime.min entries.sort(key=parse_date) return entries except ET.ParseError as e: logger.error(f"XML parsing error: {e}") raise ### MARKDOWN CLEANER ### def html_to_markdown(html_content): md = html_content md = re.sub(r"", "\n", md, flags=re.IGNORECASE) md = re.sub(r"", "**", md, flags=re.IGNORECASE) md = re.sub(r"", "\n", md, flags=re.IGNORECASE) md = re.sub(r"<[^>]+>", "", md) return md.strip()[:3500] ### COLOR LOGIC ### def get_event_color(status, severity): SPECIAL_COLORS = { ("PEW", "Open"): 0x51D3D4, ("PEW", "Planned"): 0x51D3D4, } key = (severity, status) if key in SPECIAL_COLORS: return SPECIAL_COLORS[key] if status == "Closed": return COLOR_CLOSED STATUS_SEVERITY_COLORS = { ("Open", "Minor"): COLOR_MINOR_OPEN, ("Open", "Major"): COLOR_MAJOR_OPEN, ("Open", "MSO"): COLOR_MAJOR_OPEN, } return STATUS_SEVERITY_COLORS.get((status, severity), COLOR_UNKNOWN) ### DISCORD FORMAT ### def build_discord_payload(entry, change_type="New entry"): content_md = html_to_markdown(entry["content"]) try: updated_ts = datetime.fromisoformat(entry["updated"].replace("Z", "+00:00")).isoformat() except Exception: updated_ts = entry["updated"] color = get_event_color(entry["status"], entry["severity"]) embed = { "title": f"{entry['title']} ({change_type})", "url": entry["link"], "description": content_md, "timestamp": updated_ts, "color": color, "fields": [ {"name": "Published", "value": entry["published"], "inline": True}, {"name": "Severity", "value": entry["severity"], "inline": True}, {"name": "Status", "value": entry["status"], "inline": True}, {"name": "Categories", "value": entry["categories"], "inline": False}, ] } return {"embeds": [embed]} def post_to_discord(webhook_url, payload): if not webhook_url: logger.error("Discord webhook URL is not set!") return try: resp = requests.post(webhook_url, json=payload) resp.raise_for_status() logger.info("Posted to Discord") except requests.RequestException as e: logger.error(f"Discord post failed: {e}") ### MAIN LOGIC ### def main(): logger.info("[*] Fetching feed...") feed_xml = fetch_feed(FEED_URL) logger.info("[*] Parsing feed entries...") entries = parse_feed_entries(feed_xml) if not entries: logger.warning("No entries found") return state = load_state() # Group entries by incident ID: latest entry wins incidents = {} for entry in entries: incidents[entry["id"]] = entry # overwrite with latest entry for each incident # Post updates per incident for incident_id, entry in incidents.items(): prev = state.get(incident_id) must_post = False change_type = "New entry" if prev is None: must_post = True change_type = "New Incident Detected" else: if entry["status"] != prev.get("status"): must_post = True change_type = f"Status changed: {prev.get('status')} → {entry['status']}" elif entry["severity"] != prev.get("severity"): must_post = True change_type = f"Severity changed: {prev.get('severity')} → {entry['severity']}" elif entry["updated"] != prev.get("updated"): must_post = True change_type = "Feed updated" elif entry["content"] != prev.get("content"): must_post = True change_type = "Content updated" if must_post: logger.info(f"[+] Posting update: {change_type} ({entry['title']})") payload = build_discord_payload(entry, change_type) post_to_discord(WEBHOOK_URL, payload) # Save latest state state[incident_id] = { "status": entry["status"], "severity": entry["severity"], "updated": entry["updated"], "content": entry["content"] } save_state(state) if __name__ == "__main__": main()