#!/usr/bin/env python3 import time import threading import os import logging import logging.handlers import socket import ax25 import ax25.ports import ax25.socket # Settings BASE_CALLSIGN = "KI5QKX-10" NETWORK_NAME = "HAMNET-HOUSTON" BEACON_INTERVAL = 10 # seconds BEACON_CALLSIGN = "QST-0" # IP pool for assigning to clients IP_POOL = [ "44.127.254.12", "44.127.254.13", "44.127.254.14", "44.127.254.15" ] LEASE_TIME = 3600 # seconds NETMASK = "24" GATEWAY = "44.127.254.1" DNS_SERVER = "44.127.254.1" active_leases = {} # Setup logging os.makedirs("logs", exist_ok=True) logger = logging.getLogger("craprniac") logger.setLevel(logging.INFO) log_handler = logging.handlers.TimedRotatingFileHandler( "logs/craprniac.log", when="midnight", interval=1, backupCount=7 ) log_handler.setFormatter(logging.Formatter( "%(asctime)s [%(levelname)s] %(message)s" )) logger.addHandler(log_handler) console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( "%(asctime)s [%(levelname)s] %(message)s" )) logger.addHandler(console_handler) # Packet builders def build_beacon(): return f"0.1|CRAP_BEACON|{BASE_CALLSIGN}|{NETWORK_NAME}" def build_accept(client_callsign, ip_address): return f"0.1|CRAP_ACCEPT|{client_callsign}|{NETWORK_NAME}|{ip_address}/{NETMASK}|{GATEWAY}|{DNS_SERVER}|{LEASE_TIME}" # Beaconing loop def beacon_loop(beacon_sock): while True: try: # beacon_sock.connect(BEACON_CALLSIGN) beacon_sock.sendto(build_beacon().encode('utf-8'), "QST-0") logger.info("Sent beacon") except Exception as e: logger.error(f"Beacon send failed: {e}") time.sleep(BEACON_INTERVAL) # Handle a single client connection def handle_client(conn, addr): try: logger.info(f"[{addr}] Connection established.") while True: data = conn.recv(1024) if not data: logger.info(f"[{addr}] Connection closed.") break decoded = data.decode('utf-8') handle_request(decoded, addr, conn) except Exception as e: logger.error(f"[{addr}] Error: {e}") finally: conn.close() # Handle incoming requests def handle_request(data, src_callsign, conn): parts = data.split('|') if len(parts) < 2: logger.warning(f"[{src_callsign}] Received malformed message: {data}") return if parts[1] == "CRAP_BEACON": return if parts[1] != "CRAP_REQUEST": logger.warning(f"[{src_callsign}] Ignoring unknown message: {data}") return client_callsign = parts[2] network_requested = parts[3] if network_requested != NETWORK_NAME: logger.warning(f"[{client_callsign}] Requested wrong network ({network_requested}). Ignored.") return logger.info(f"[{client_callsign}] Join request received.") # Assign IP if client_callsign in active_leases: assigned_ip = active_leases[client_callsign] logger.info(f"[{client_callsign}] Already assigned {assigned_ip}.") elif IP_POOL: assigned_ip = IP_POOL.pop(0) active_leases[client_callsign] = assigned_ip logger.info(f"[{client_callsign}] Assigned new IP {assigned_ip}.") else: logger.error(f"[{client_callsign}] No available IPs to assign!") return # Send CRAP_ACCEPT try: conn.send(build_accept(client_callsign, assigned_ip).encode('utf-8')) logger.info(f"[{client_callsign}] Sent CRAP_ACCEPT.") except Exception as e: logger.error(f"[{client_callsign}] Failed to send CRAP_ACCEPT: {e}") # Main server function def main(): # Beaconing socket beacon_sock = ax25.socket.Socket(socket.SOCK_DGRAM) beacon_sock.bind(BASE_CALLSIGN) # Server listening socket server_sock = ax25.socket.Socket() server_sock.bind(BASE_CALLSIGN) server_sock.listen() logger.info(f"Base Station {BASE_CALLSIGN} starting up on {NETWORK_NAME}...") threading.Thread(target=beacon_loop, args=(beacon_sock,), daemon=True).start() while True: try: conn, addr = server_sock.accept() logger.info(f"Accepted connection from {addr}") client_thread = threading.Thread(target=handle_client, args=(conn, addr)) client_thread.daemon = True client_thread.start() except Exception as e: logger.error(f"Error accepting connection: {e}") if __name__ == "__main__": main()