### # Copyright (c) 2023, John Burwell # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, # this list of conditions, and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions, and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of the author of this software nor the name of # contributors to this software may be used to endorse or promote products # derived from this software without specific prior written consent. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. ### import enum import json import logging import os import random import re import time from dataclasses import dataclass from typing import Dict, Optional import requests import supybot from supybot import callbacks, conf, ircmsgs, ircutils, schedule from supybot.commands import * try: from supybot.i18n import PluginInternationalization _ = PluginInternationalization('Chat') except ImportError: # Placeholder that allows to run the plugin on a bot # without the i18n module _ = lambda x: x def truncate_messages(messages, max_tokens): """ Truncates the messages list to ensure the total token count does not exceed max_tokens. Args: messages (list): The list of message dictionaries to truncate. max_tokens (int): The maximum number of tokens allowed. Returns: list: The truncated list of messages. """ total_tokens = 0 truncated = [] for message in reversed(messages): # Approximate token count by splitting content into words message_tokens = len(message["content"].split()) if total_tokens + message_tokens > max_tokens: break truncated.insert(0, message) total_tokens += message_tokens return truncated class EngagementState(enum.Enum): IDLE = "idle" INVITED = "invited" ENGAGED = "engaged" COOLING = "cooling" @dataclass class ChannelSession: state: EngagementState = EngagementState.IDLE thread_owner: Optional[str] = None last_invitation: float = 0.0 last_spoken: float = 0.0 last_user_msg: float = 0.0 replies_in_thread: int = 0 cooling_until: float = 0.0 def reset(self): self.state = EngagementState.IDLE self.thread_owner = None self.last_invitation = 0.0 self.last_spoken = 0.0 self.last_user_msg = 0.0 self.replies_in_thread = 0 self.cooling_until = 0.0 CLASSIFIER_SYSTEM_PROMPT = ( "You evaluate whether an IRC bot should reply." " Respond with exactly one word: reply or skip." " Choose reply only if the bot's participation would be helpful," " expected, or keeps an active thread alive." ) def is_poetry_block(lines): if 2 < len(lines) <= 4: avg_len = sum(len(l) for l in lines) / len(lines) if avg_len < 20 and all(not l.endswith((".", "?", "!")) for l in lines): return True return False class Chat(callbacks.Plugin): """Sends message to ChatGPT and replies with the response """ def __init__(self, irc): self.__parent = super(Chat, self) self.__parent.__init__(irc) log_level = self.registryValue('log_level').upper() self.log.setLevel(getattr(logging, log_level, logging.INFO)) self.log.info("Chat plugin initialized with log level: %s", log_level) self.sessions: Dict[str, ChannelSession] = {} def _send_line(self, irc, target, line): if line.startswith("/me "): irc.queueMsg(ircmsgs.action(target, line[4:].strip())) else: irc.reply(line, to=target) def _burst(self, irc, target, lines, base_delay=0.6): now = time.time() for i, line in enumerate(lines[:3]): # never more than 3 delay = base_delay*i + random.uniform(0.05, 0.25) schedule.addEvent(lambda l=line: self._send_line(irc, target, l), now + delay) def handle_response(self, irc, msg, response, session: Optional[ChannelSession] = None): target = msg.args[0] lines = [l.strip() for l in response.splitlines() if l.strip()] if not lines: return if len(lines) == 1: self._send_line(irc, target, lines[0]) elif is_poetry_block(lines): # squash poem into single line self._send_line(irc, target, " / ".join(lines)) else: self._burst(irc, target, lines) if session is not None: now = time.time() session.last_spoken = now session.replies_in_thread += 1 if session.thread_owner is None: session.thread_owner = msg.nick if session.state != EngagementState.ENGAGED: session.state = EngagementState.ENGAGED def _invocation_string(self, irc): return f"{conf.supybot.reply.whenAddressedBy.chars()}{self.name().lower()} " def _is_command_invocation(self, irc, text): stripped = text.strip() lowered = stripped.lower() plugin_name = self.name().lower() nick_lower = irc.nick.lower() prefix_chars = conf.supybot.reply.whenAddressedBy.chars() for char in prefix_chars: token = f"{char}{plugin_name} " if lowered.startswith(token): return True if lowered.startswith(f"{plugin_name} "): return True address_patterns = ( f"{nick_lower}: {plugin_name} ", f"{nick_lower}, {plugin_name} ", f"@{nick_lower} {plugin_name} ", ) for pattern in address_patterns: if lowered.startswith(pattern): return True return False def _session_key(self, irc, channel): return f"{irc.network}:{channel.lower()}" def _get_session(self, irc, channel): key = self._session_key(irc, channel) if key not in self.sessions: self.sessions[key] = ChannelSession() return self.sessions[key] def _resolve_system_prompt(self, irc, channel): default_prompt = "You are a helpful assistant." prompt_file = self.registryValue("system_prompt_file") if prompt_file and not os.path.isabs(prompt_file): prompt_file = os.path.join(os.path.dirname(__file__), prompt_file) try: if prompt_file: with open(prompt_file, "r") as f: system_prompt = f.read() else: raise FileNotFoundError("No prompt file specified.") except Exception as e: self.log.error(f"Could not read prompt file: {e}") system_prompt = self.registryValue("system_prompt") or default_prompt system_prompt = system_prompt.replace("$bot_name", irc.nick).replace("$channel_name", channel) passive_mode = self.registryValue("passive_mode") passive_addendum = (self.registryValue("passive_prompt_addendum") or "").strip() if passive_mode != "off" and passive_addendum: passive_addendum = passive_addendum.replace("$bot_name", irc.nick).replace("$channel_name", channel) system_prompt = f"{system_prompt.strip()}\n\n{passive_addendum}" return system_prompt def _collect_events(self, irc, channel, invocation_string, exclude_msg=None): history_limit = self.registryValue("scrollback_lines") events = [] for message in irc.state.history[-history_limit:]: if message.command != 'PRIVMSG' or message.args[0] != channel: continue if exclude_msg is not None and message is exclude_msg: continue nick = message.nick or "" cleaned = self.filter_prefix(message.args[1], invocation_string) role = 'assistant' if ircutils.strEqual(nick, irc.nick) else 'user' events.append({ "role": role, "nick": nick, "content": cleaned, }) return events def _events_to_messages(self, events): rendered = [] for event in events: if event["role"] == "assistant": content = event["content"] else: speaker = event["nick"] or "user" content = f"{speaker}: {event['content']}" rendered.append({"role": event["role"], "content": content}) return rendered def _history_enabled(self): return bool(self.registryValue("history_service_url").strip()) def _extract_history_query(self, text): lowered = text.lower() prefixes = ["history:", "log:", "logs:", "recap:"] for prefix in prefixes: if lowered.startswith(prefix): query = text[len(prefix):].strip() return True, query or None triggers = [t.lower() for t in self.registryValue("history_trigger_words") if t] for trigger in triggers: if trigger and trigger in lowered: return True, text phrase_triggers = ["what did", "when did", "who said", "last time", "earlier today"] for phrase in phrase_triggers: if phrase in lowered: return True, text return False, None def _history_request(self, irc, channel, query=None): base_url = self.registryValue("history_service_url").strip() if not base_url: return [] base_url = base_url.rstrip('/') endpoint = "/search" if query else "/recent" params = { "network": irc.network, "channel": channel, "limit": str(max(1, self.registryValue("history_result_limit"))), } include_files = self.registryValue("history_include_files") if include_files > 0: params["include_files"] = str(include_files) if query: params["q"] = query[:240] headers = {} token = self.registryValue("history_service_token").strip() if token: headers["Authorization"] = f"Bearer {token}" timeout = self.registryValue("history_service_timeout") url = f"{base_url}{endpoint}" try: response = requests.get(url, params=params, headers=headers, timeout=timeout) if response.status_code == 404: return [] response.raise_for_status() data = response.json() if isinstance(data, list): return data return [] except requests.RequestException as e: self.log.debug("History request failed | url=%s | error=%s", url, e) return [] def _format_history_block(self, irc, items): max_chars = max(0, self.registryValue("history_max_chars")) max_lines = max(1, self.registryValue("history_max_lines")) lines = [] seen = set() for item in items: ts = (item.get("ts") or "").strip() nick = (item.get("nick") or "").strip() text = (item.get("text") or "").strip() if not text: continue if nick and ircutils.strEqual(nick, irc.nick): continue fragment = f"{ts} {nick}: {text}".strip() if fragment in seen: continue seen.add(fragment) lines.append(fragment) if len(lines) >= max_lines: break buffer = [] used = 0 for line in lines: delta = len(line) + (1 if buffer else 0) if max_chars and used + delta > max_chars: break buffer.append(line) used += delta if not buffer: return None return "Recent channel facts:\n" + "\n".join(buffer) def _maybe_add_history_context(self, irc, msg, messages, events, user_text): if not user_text or not self._history_enabled(): return messages should_query, query = self._extract_history_query(user_text) if not should_query: return messages items = self._history_request(irc, msg.args[0], query) if not items and query: items = self._history_request(irc, msg.args[0], None) if not items: self.log.debug("History lookup returned no items | channel=%s", msg.args[0]) return messages block = self._format_history_block(irc, items) if not block: return messages insert_at = len(messages) - 1 if messages else 0 if insert_at < 0: insert_at = 0 messages.insert(insert_at, {"role": "system", "content": block}) self.log.debug("History context appended | channel=%s | lines=%d", msg.args[0], block.count('\n')) return messages def _build_messages(self, irc, msg, user_content, include_current=False): invocation_string = self._invocation_string(irc) events = self._collect_events( irc, msg.args[0], invocation_string, exclude_msg=None if include_current else msg, ) system_prompt = self._resolve_system_prompt(irc, msg.args[0]) messages = [{"role": "system", "content": system_prompt}] + self._events_to_messages(events) if include_current: content = self.filter_prefix(user_content, invocation_string) messages.append({"role": "user", "content": f"{msg.nick}: {content}"}) else: messages.append({"role": "user", "content": user_content}) messages = truncate_messages(messages, 8192) return messages, events def _post_chat_completion(self, messages, max_tokens=None, temperature=None, model=None, timeout=10): payload = { "model": model or self.registryValue("model"), "messages": messages, } if max_tokens is not None: payload["max_tokens"] = max_tokens if temperature is not None: payload["temperature"] = temperature api_key = self.registryValue('api_key') if not api_key: raise ValueError("API key not configured") res = requests.post( "https://api.openai.com/v1/chat/completions", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" }, json=payload, timeout=timeout ) res.raise_for_status() data = res.json() if "error" in data: raise RuntimeError(data["error"].get("message", "Unknown error")) return data['choices'][0]['message']['content'].strip() def _reset_if_stale(self, session, now): engagement_timeout = self.registryValue("passive_engagement_timeout") if session.state == EngagementState.ENGAGED and session.last_spoken: if now - session.last_spoken > engagement_timeout: session.state = EngagementState.COOLING session.cooling_until = now + self.registryValue("passive_cooldown") session.thread_owner = None session.replies_in_thread = 0 if session.state == EngagementState.INVITED and session.last_invitation: if now - session.last_invitation > engagement_timeout: session.state = EngagementState.COOLING session.cooling_until = now + self.registryValue("passive_cooldown") session.thread_owner = None session.replies_in_thread = 0 if session.state == EngagementState.COOLING and session.cooling_until: if now >= session.cooling_until: session.reset() def _is_direct_mention(self, irc, text): lowered = text.lower() nick_lower = irc.nick.lower() if lowered.startswith(f"{nick_lower}:") or lowered.startswith(f"{nick_lower},"): return True tokens = re.findall(r"[@]?[\w'-]+", lowered) for token in tokens: if token.lstrip('@') == nick_lower: return True return False def _looks_like_question(self, text): stripped = text.strip() if stripped.endswith('?'): return True lowered = stripped.lower() question_words = {"who", "what", "when", "where", "why", "how", "should", "could", "would"} if any(lowered.startswith(word + " ") for word in question_words): return True return False def _contains_trigger_word(self, text): triggers = [w.lower() for w in self.registryValue("passive_trigger_words")] if not triggers: return False words = set(re.findall(r"[\w']+", text.lower())) return any(word in words for word in triggers) def _classify_passive_trigger(self, irc, msg, events): history_lines = [] for event in events[-6:]: speaker = event["nick"] or irc.nick if event["role"] == "assistant": speaker = irc.nick history_lines.append(f"{speaker}: {event['content']}") prompt = ( "Recent IRC conversation:\n" + "\n".join(history_lines[-8:]) + f"\n\nShould {irc.nick} reply? Respond with one word: reply or skip." ) messages = [ {"role": "system", "content": CLASSIFIER_SYSTEM_PROMPT}, {"role": "user", "content": prompt}, ] try: outcome = self._post_chat_completion(messages, max_tokens=4, temperature=0) except Exception as e: self.log.debug(f"Classifier fallback due to error: {e}") return False decision = outcome.strip().split()[0].lower() self.log.debug( "Passive classifier outcome | decision=%s | raw=%s | channel=%s", decision, outcome.strip(), msg.args[0], ) return decision == "reply" def _should_respond_passively(self, irc, msg, session, events): passive_mode = self.registryValue("passive_mode") if passive_mode == "off": return False text = msg.args[1] now = time.time() mention = self._is_direct_mention(irc, text) self.log.debug( "Passive evaluate | mode=%s | state=%s | nick=%s | channel=%s | mention=%s | replies=%s", passive_mode, session.state.value, msg.nick, msg.args[0], mention, session.replies_in_thread, ) if mention: if session.state == EngagementState.COOLING: session.reset() self.log.debug( "Passive reset session for mention during cooldown | channel=%s", msg.args[0], ) session.thread_owner = msg.nick session.last_invitation = now session.state = EngagementState.INVITED session.replies_in_thread = 0 self.log.debug("Passive accepted mention | channel=%s", msg.args[0]) return True if session.state == EngagementState.COOLING: self.log.debug( "Passive skip: cooling | channel=%s | until=%.2f | now=%.2f", msg.args[0], session.cooling_until, now, ) return False if session.state in (EngagementState.INVITED, EngagementState.ENGAGED) and session.thread_owner: if ircutils.strEqual(msg.nick, session.thread_owner): self.log.debug( "Passive continuing thread with owner | channel=%s | owner=%s", msg.args[0], session.thread_owner, ) return True if passive_mode == "mention": self.log.debug("Passive skip: mention mode without mention | channel=%s", msg.args[0]) return False if session.state != EngagementState.IDLE: self.log.debug( "Passive skip: state not idle | channel=%s | state=%s", msg.args[0], session.state.value, ) return False candidate = self._looks_like_question(text) or self._contains_trigger_word(text) if not candidate: self.log.debug("Passive skip: heuristics failed | channel=%s", msg.args[0]) return False try: probability = float(self.registryValue("passive_probability")) except (TypeError, ValueError): probability = 0.0 if probability <= 0: self.log.debug("Passive skip: probability disabled | channel=%s", msg.args[0]) return False roll = random.random() if roll > probability: self.log.debug( "Passive skip: probability gate | roll=%.2f | threshold=%.2f | channel=%s", roll, probability, msg.args[0], ) return False if not self._classify_passive_trigger(irc, msg, events): self.log.debug("Passive skip: classifier veto | channel=%s", msg.args[0]) return False session.thread_owner = msg.nick session.last_invitation = now session.state = EngagementState.INVITED session.replies_in_thread = 0 self.log.debug("Passive proceed: classifier approved | channel=%s", msg.args[0]) return True def doPrivmsg(self, irc, msg): parent_do_privmsg = getattr(super(Chat, self), 'doPrivmsg', None) if parent_do_privmsg: parent_do_privmsg(irc, msg) passive_mode = self.registryValue("passive_mode") if passive_mode == "off": return target = msg.args[0] if not ircutils.isChannel(target): return if ircutils.strEqual(msg.nick, irc.nick): return text = msg.args[1] if self._is_command_invocation(irc, text): return if not self.registryValue('api_key'): return invocation_string = self._invocation_string(irc) session = self._get_session(irc, target) now = time.time() session.last_user_msg = now self._reset_if_stale(session, now) messages, events = self._build_messages(irc, msg, text, include_current=True) self.log.debug("Passive base context: %s", json.dumps(messages)) if not self._should_respond_passively(irc, msg, session, events): return max_replies = self.registryValue("passive_max_replies") if max_replies >= 0 and session.replies_in_thread >= max_replies: session.state = EngagementState.COOLING session.cooling_until = now + self.registryValue("passive_cooldown") session.thread_owner = None session.replies_in_thread = 0 self.log.debug( "Passive skip: reached max replies | channel=%s | max=%s", msg.args[0], max_replies, ) return messages = self._maybe_add_history_context(irc, msg, messages, events, text) messages = truncate_messages(messages, 8192) self.log.debug("Passive API Request: %s", json.dumps(messages)) try: response = self._post_chat_completion(messages, max_tokens=self.registryValue("max_tokens")) except (ValueError, requests.exceptions.Timeout, requests.exceptions.HTTPError) as e: self.log.debug(f"Passive reply aborted due to recoverable error: {e}") return except (requests.exceptions.RequestException, RuntimeError) as e: self.log.debug(f"Passive reply aborted due to API error: {e}") return self.handle_response(irc, msg, response, session=session) self.log.debug("Passive reply sent | channel=%s", msg.args[0]) def filter_prefix(self, msg, prefix): if msg.startswith(prefix): return msg[len(prefix):] else: return msg def chat(self, irc, msg, args, string): """ Sends a message to ChatGPT and returns the response. The bot will include recent conversation history from the channel to provide context. Example: @bot chat What is the capital of France? """ max_tokens = self.registryValue("max_tokens") session = self._get_session(irc, msg.args[0]) now = time.time() session.last_user_msg = now session.last_invitation = now session.thread_owner = msg.nick if session.state == EngagementState.COOLING: session.reset() session.state = EngagementState.INVITED session.replies_in_thread = 0 messages, events = self._build_messages(irc, msg, string, include_current=False) self.log.debug(f"API base context: {json.dumps(messages)}") messages = self._maybe_add_history_context(irc, msg, messages, events, string) messages = truncate_messages(messages, 8192) self.log.debug(f"API Request: {json.dumps(messages)}") try: response = self._post_chat_completion(messages, max_tokens=max_tokens) except ValueError as e: self.log.error(f"Configuration error: {e}") irc.reply("The API key is not configured. Please set it before using this command.") return except requests.exceptions.Timeout: self.log.error("Request timed out.") irc.reply("The request to the API timed out. Please try again later.") return except requests.exceptions.HTTPError as e: self.log.error(f"HTTP error: {e}") irc.reply("An HTTP error occurred while contacting the API.") return except (requests.exceptions.RequestException, RuntimeError) as e: self.log.error(f"Request exception: {e}") irc.reply("An error occurred while contacting the API.") return self.handle_response(irc, msg, response, session=session) self.log.info(f"Successfully processed request for user {msg.nick} in channel {msg.args[0]}") chat = wrap(chat, ['text']) Class = Chat