554 lines
20 KiB
Python
554 lines
20 KiB
Python
###
|
|
# Copyright (c) 2023, John Burwell
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
#
|
|
# * Redistributions of source code must retain the above copyright notice,
|
|
# this list of conditions, and the following disclaimer.
|
|
# * Redistributions in binary form must reproduce the above copyright notice,
|
|
# this list of conditions, and the following disclaimer in the
|
|
# documentation and/or other materials provided with the distribution.
|
|
# * Neither the name of the author of this software nor the name of
|
|
# contributors to this software may be used to endorse or promote products
|
|
# derived from this software without specific prior written consent.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
###
|
|
|
|
import enum
|
|
import json
|
|
import logging
|
|
import os
|
|
import random
|
|
import re
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Optional
|
|
|
|
import requests
|
|
import supybot
|
|
from supybot import callbacks, conf, ircmsgs, ircutils, schedule
|
|
from supybot.commands import *
|
|
|
|
try:
|
|
from supybot.i18n import PluginInternationalization
|
|
_ = PluginInternationalization('Chat')
|
|
except ImportError:
|
|
# Placeholder that allows to run the plugin on a bot
|
|
# without the i18n module
|
|
_ = lambda x: x
|
|
|
|
|
|
def truncate_messages(messages, max_tokens):
|
|
"""
|
|
Truncates the messages list to ensure the total token count does not exceed max_tokens.
|
|
|
|
Args:
|
|
messages (list): The list of message dictionaries to truncate.
|
|
max_tokens (int): The maximum number of tokens allowed.
|
|
|
|
Returns:
|
|
list: The truncated list of messages.
|
|
"""
|
|
total_tokens = 0
|
|
truncated = []
|
|
for message in reversed(messages):
|
|
# Approximate token count by splitting content into words
|
|
message_tokens = len(message["content"].split())
|
|
if total_tokens + message_tokens > max_tokens:
|
|
break
|
|
truncated.insert(0, message)
|
|
total_tokens += message_tokens
|
|
return truncated
|
|
|
|
|
|
class EngagementState(enum.Enum):
|
|
IDLE = "idle"
|
|
INVITED = "invited"
|
|
ENGAGED = "engaged"
|
|
COOLING = "cooling"
|
|
|
|
|
|
@dataclass
|
|
class ChannelSession:
|
|
state: EngagementState = EngagementState.IDLE
|
|
thread_owner: Optional[str] = None
|
|
last_invitation: float = 0.0
|
|
last_spoken: float = 0.0
|
|
last_user_msg: float = 0.0
|
|
replies_in_thread: int = 0
|
|
cooling_until: float = 0.0
|
|
|
|
def reset(self):
|
|
self.state = EngagementState.IDLE
|
|
self.thread_owner = None
|
|
self.last_invitation = 0.0
|
|
self.last_spoken = 0.0
|
|
self.last_user_msg = 0.0
|
|
self.replies_in_thread = 0
|
|
self.cooling_until = 0.0
|
|
|
|
|
|
CLASSIFIER_SYSTEM_PROMPT = (
|
|
"You evaluate whether an IRC bot should reply."
|
|
" Respond with exactly one word: reply or skip."
|
|
" Choose reply only if the bot's participation would be helpful,"
|
|
" expected, or keeps an active thread alive."
|
|
)
|
|
|
|
|
|
def is_poetry_block(lines):
|
|
if 2 < len(lines) <= 4:
|
|
avg_len = sum(len(l) for l in lines) / len(lines)
|
|
if avg_len < 20 and all(not l.endswith((".", "?", "!")) for l in lines):
|
|
return True
|
|
return False
|
|
|
|
|
|
class Chat(callbacks.Plugin):
|
|
"""Sends message to ChatGPT and replies with the response
|
|
"""
|
|
|
|
def __init__(self, irc):
|
|
self.__parent = super(Chat, self)
|
|
self.__parent.__init__(irc)
|
|
log_level = self.registryValue('log_level').upper()
|
|
self.log.setLevel(getattr(logging, log_level, logging.INFO))
|
|
self.log.info("Chat plugin initialized with log level: %s", log_level)
|
|
self.sessions: Dict[str, ChannelSession] = {}
|
|
|
|
def _send_line(self, irc, target, line):
|
|
if line.startswith("/me "):
|
|
irc.queueMsg(ircmsgs.action(target, line[4:].strip()))
|
|
else:
|
|
irc.reply(line, to=target)
|
|
|
|
def _burst(self, irc, target, lines, base_delay=0.6):
|
|
now = time.time()
|
|
for i, line in enumerate(lines[:3]): # never more than 3
|
|
delay = base_delay*i + random.uniform(0.05, 0.25)
|
|
schedule.addEvent(lambda l=line: self._send_line(irc, target, l),
|
|
now + delay)
|
|
|
|
def handle_response(self, irc, msg, response, session: Optional[ChannelSession] = None):
|
|
target = msg.args[0]
|
|
lines = [l.strip() for l in response.splitlines() if l.strip()]
|
|
if not lines:
|
|
return
|
|
|
|
if len(lines) == 1:
|
|
self._send_line(irc, target, lines[0])
|
|
elif is_poetry_block(lines):
|
|
# squash poem into single line
|
|
self._send_line(irc, target, " / ".join(lines))
|
|
else:
|
|
self._burst(irc, target, lines)
|
|
|
|
if session is not None:
|
|
now = time.time()
|
|
session.last_spoken = now
|
|
session.replies_in_thread += 1
|
|
if session.thread_owner is None:
|
|
session.thread_owner = msg.nick
|
|
if session.state != EngagementState.ENGAGED:
|
|
session.state = EngagementState.ENGAGED
|
|
|
|
def _invocation_string(self, irc):
|
|
return f"{conf.supybot.reply.whenAddressedBy.chars()}{self.name().lower()} "
|
|
|
|
def _is_command_invocation(self, irc, text):
|
|
stripped = text.strip()
|
|
lowered = stripped.lower()
|
|
plugin_name = self.name().lower()
|
|
nick_lower = irc.nick.lower()
|
|
|
|
prefix_chars = conf.supybot.reply.whenAddressedBy.chars()
|
|
for char in prefix_chars:
|
|
token = f"{char}{plugin_name} "
|
|
if lowered.startswith(token):
|
|
return True
|
|
|
|
if lowered.startswith(f"{plugin_name} "):
|
|
return True
|
|
|
|
address_patterns = (
|
|
f"{nick_lower}: {plugin_name} ",
|
|
f"{nick_lower}, {plugin_name} ",
|
|
f"@{nick_lower} {plugin_name} ",
|
|
)
|
|
for pattern in address_patterns:
|
|
if lowered.startswith(pattern):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _session_key(self, irc, channel):
|
|
return f"{irc.network}:{channel.lower()}"
|
|
|
|
def _get_session(self, irc, channel):
|
|
key = self._session_key(irc, channel)
|
|
if key not in self.sessions:
|
|
self.sessions[key] = ChannelSession()
|
|
return self.sessions[key]
|
|
|
|
def _resolve_system_prompt(self, irc, channel):
|
|
default_prompt = "You are a helpful assistant."
|
|
prompt_file = self.registryValue("system_prompt_file")
|
|
if prompt_file and not os.path.isabs(prompt_file):
|
|
prompt_file = os.path.join(os.path.dirname(__file__), prompt_file)
|
|
|
|
try:
|
|
if prompt_file:
|
|
with open(prompt_file, "r") as f:
|
|
system_prompt = f.read()
|
|
else:
|
|
raise FileNotFoundError("No prompt file specified.")
|
|
except Exception as e:
|
|
self.log.error(f"Could not read prompt file: {e}")
|
|
system_prompt = self.registryValue("system_prompt") or default_prompt
|
|
|
|
system_prompt = system_prompt.replace("$bot_name", irc.nick).replace("$channel_name", channel)
|
|
|
|
passive_mode = self.registryValue("passive_mode")
|
|
passive_addendum = (self.registryValue("passive_prompt_addendum") or "").strip()
|
|
if passive_mode != "off" and passive_addendum:
|
|
passive_addendum = passive_addendum.replace("$bot_name", irc.nick).replace("$channel_name", channel)
|
|
system_prompt = f"{system_prompt.strip()}\n\n{passive_addendum}"
|
|
|
|
return system_prompt
|
|
|
|
def _collect_events(self, irc, channel, invocation_string, exclude_msg=None):
|
|
history_limit = self.registryValue("scrollback_lines")
|
|
events = []
|
|
for message in irc.state.history[-history_limit:]:
|
|
if message.command != 'PRIVMSG' or message.args[0] != channel:
|
|
continue
|
|
if exclude_msg is not None and message is exclude_msg:
|
|
continue
|
|
|
|
nick = message.nick or ""
|
|
cleaned = self.filter_prefix(message.args[1], invocation_string)
|
|
role = 'assistant' if ircutils.strEqual(nick, irc.nick) else 'user'
|
|
events.append({
|
|
"role": role,
|
|
"nick": nick,
|
|
"content": cleaned,
|
|
})
|
|
return events
|
|
|
|
def _events_to_messages(self, events):
|
|
rendered = []
|
|
for event in events:
|
|
if event["role"] == "assistant":
|
|
content = event["content"]
|
|
else:
|
|
speaker = event["nick"] or "user"
|
|
content = f"{speaker}: {event['content']}"
|
|
rendered.append({"role": event["role"], "content": content})
|
|
return rendered
|
|
|
|
def _build_messages(self, irc, msg, user_content, include_current=False):
|
|
invocation_string = self._invocation_string(irc)
|
|
events = self._collect_events(
|
|
irc,
|
|
msg.args[0],
|
|
invocation_string,
|
|
exclude_msg=None if include_current else msg,
|
|
)
|
|
|
|
system_prompt = self._resolve_system_prompt(irc, msg.args[0])
|
|
messages = [{"role": "system", "content": system_prompt}] + self._events_to_messages(events)
|
|
|
|
if include_current:
|
|
content = self.filter_prefix(user_content, invocation_string)
|
|
messages.append({"role": "user", "content": f"{msg.nick}: {content}"})
|
|
else:
|
|
messages.append({"role": "user", "content": user_content})
|
|
|
|
messages = truncate_messages(messages, 8192)
|
|
return messages, events
|
|
|
|
def _post_chat_completion(self, messages, max_tokens=None, temperature=None, model=None, timeout=10):
|
|
payload = {
|
|
"model": model or self.registryValue("model"),
|
|
"messages": messages,
|
|
}
|
|
if max_tokens is not None:
|
|
payload["max_tokens"] = max_tokens
|
|
if temperature is not None:
|
|
payload["temperature"] = temperature
|
|
|
|
api_key = self.registryValue('api_key')
|
|
if not api_key:
|
|
raise ValueError("API key not configured")
|
|
|
|
res = requests.post(
|
|
"https://api.openai.com/v1/chat/completions",
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {api_key}"
|
|
},
|
|
json=payload,
|
|
timeout=timeout
|
|
)
|
|
res.raise_for_status()
|
|
data = res.json()
|
|
|
|
if "error" in data:
|
|
raise RuntimeError(data["error"].get("message", "Unknown error"))
|
|
|
|
return data['choices'][0]['message']['content'].strip()
|
|
|
|
def _reset_if_stale(self, session, now):
|
|
engagement_timeout = self.registryValue("passive_engagement_timeout")
|
|
if session.state == EngagementState.ENGAGED and session.last_spoken:
|
|
if now - session.last_spoken > engagement_timeout:
|
|
session.state = EngagementState.COOLING
|
|
session.cooling_until = now + self.registryValue("passive_cooldown")
|
|
session.thread_owner = None
|
|
session.replies_in_thread = 0
|
|
if session.state == EngagementState.INVITED and session.last_invitation:
|
|
if now - session.last_invitation > engagement_timeout:
|
|
session.state = EngagementState.COOLING
|
|
session.cooling_until = now + self.registryValue("passive_cooldown")
|
|
session.thread_owner = None
|
|
session.replies_in_thread = 0
|
|
if session.state == EngagementState.COOLING and session.cooling_until:
|
|
if now >= session.cooling_until:
|
|
session.reset()
|
|
|
|
def _is_direct_mention(self, irc, text):
|
|
lowered = text.lower()
|
|
nick_lower = irc.nick.lower()
|
|
if lowered.startswith(f"{nick_lower}:") or lowered.startswith(f"{nick_lower},"):
|
|
return True
|
|
tokens = re.findall(r"[@]?[\w'-]+", lowered)
|
|
for token in tokens:
|
|
if token.lstrip('@') == nick_lower:
|
|
return True
|
|
return False
|
|
|
|
def _looks_like_question(self, text):
|
|
stripped = text.strip()
|
|
if stripped.endswith('?'):
|
|
return True
|
|
lowered = stripped.lower()
|
|
question_words = {"who", "what", "when", "where", "why", "how", "should", "could", "would"}
|
|
if any(lowered.startswith(word + " ") for word in question_words):
|
|
return True
|
|
return False
|
|
|
|
def _contains_trigger_word(self, text):
|
|
triggers = [w.lower() for w in self.registryValue("passive_trigger_words")]
|
|
if not triggers:
|
|
return False
|
|
words = set(re.findall(r"[\w']+", text.lower()))
|
|
return any(word in words for word in triggers)
|
|
|
|
def _classify_passive_trigger(self, irc, msg, events):
|
|
history_lines = []
|
|
for event in events[-6:]:
|
|
speaker = event["nick"] or irc.nick
|
|
if event["role"] == "assistant":
|
|
speaker = irc.nick
|
|
history_lines.append(f"{speaker}: {event['content']}")
|
|
prompt = (
|
|
"Recent IRC conversation:\n" + "\n".join(history_lines[-8:]) +
|
|
f"\n\nShould {irc.nick} reply? Respond with one word: reply or skip."
|
|
)
|
|
|
|
messages = [
|
|
{"role": "system", "content": CLASSIFIER_SYSTEM_PROMPT},
|
|
{"role": "user", "content": prompt},
|
|
]
|
|
|
|
try:
|
|
outcome = self._post_chat_completion(messages, max_tokens=4, temperature=0)
|
|
except Exception as e:
|
|
self.log.debug(f"Classifier fallback due to error: {e}")
|
|
return False
|
|
|
|
decision = outcome.strip().split()[0].lower()
|
|
return decision == "reply"
|
|
|
|
def _should_respond_passively(self, irc, msg, session, events):
|
|
passive_mode = self.registryValue("passive_mode")
|
|
if passive_mode == "off":
|
|
return False
|
|
|
|
text = msg.args[1]
|
|
now = time.time()
|
|
mention = self._is_direct_mention(irc, text)
|
|
|
|
if mention:
|
|
if session.state == EngagementState.COOLING:
|
|
session.reset()
|
|
session.thread_owner = msg.nick
|
|
session.last_invitation = now
|
|
session.state = EngagementState.INVITED
|
|
session.replies_in_thread = 0
|
|
return True
|
|
|
|
if session.state == EngagementState.COOLING:
|
|
return False
|
|
|
|
if session.state in (EngagementState.INVITED, EngagementState.ENGAGED) and session.thread_owner:
|
|
if ircutils.strEqual(msg.nick, session.thread_owner):
|
|
return True
|
|
|
|
if passive_mode == "mention":
|
|
return False
|
|
|
|
if session.state != EngagementState.IDLE:
|
|
return False
|
|
|
|
candidate = self._looks_like_question(text) or self._contains_trigger_word(text)
|
|
if not candidate:
|
|
return False
|
|
|
|
try:
|
|
probability = float(self.registryValue("passive_probability"))
|
|
except (TypeError, ValueError):
|
|
probability = 0.0
|
|
|
|
if probability <= 0:
|
|
return False
|
|
|
|
roll = random.random()
|
|
if roll > probability:
|
|
self.log.debug(f"Passive skip due to probability gate ({roll:.2f} > {probability:.2f})")
|
|
return False
|
|
|
|
if not self._classify_passive_trigger(irc, msg, events):
|
|
self.log.debug("Passive classifier opted to skip reply")
|
|
return False
|
|
|
|
session.thread_owner = msg.nick
|
|
session.last_invitation = now
|
|
session.state = EngagementState.INVITED
|
|
session.replies_in_thread = 0
|
|
return True
|
|
|
|
def doPrivmsg(self, irc, msg):
|
|
parent_do_privmsg = getattr(super(Chat, self), 'doPrivmsg', None)
|
|
if parent_do_privmsg:
|
|
parent_do_privmsg(irc, msg)
|
|
|
|
passive_mode = self.registryValue("passive_mode")
|
|
if passive_mode == "off":
|
|
return
|
|
|
|
target = msg.args[0]
|
|
if not ircutils.isChannel(target):
|
|
return
|
|
|
|
if ircutils.strEqual(msg.nick, irc.nick):
|
|
return
|
|
|
|
text = msg.args[1]
|
|
if self._is_command_invocation(irc, text):
|
|
return
|
|
|
|
if not self.registryValue('api_key'):
|
|
return
|
|
|
|
invocation_string = self._invocation_string(irc)
|
|
session = self._get_session(irc, target)
|
|
now = time.time()
|
|
session.last_user_msg = now
|
|
self._reset_if_stale(session, now)
|
|
|
|
messages, events = self._build_messages(irc, msg, text, include_current=True)
|
|
|
|
if not self._should_respond_passively(irc, msg, session, events):
|
|
return
|
|
|
|
max_replies = self.registryValue("passive_max_replies")
|
|
if max_replies >= 0 and session.replies_in_thread >= max_replies:
|
|
session.state = EngagementState.COOLING
|
|
session.cooling_until = now + self.registryValue("passive_cooldown")
|
|
session.thread_owner = None
|
|
session.replies_in_thread = 0
|
|
return
|
|
|
|
try:
|
|
response = self._post_chat_completion(messages, max_tokens=self.registryValue("max_tokens"))
|
|
except (ValueError, requests.exceptions.Timeout, requests.exceptions.HTTPError) as e:
|
|
self.log.debug(f"Passive reply aborted due to recoverable error: {e}")
|
|
return
|
|
except (requests.exceptions.RequestException, RuntimeError) as e:
|
|
self.log.debug(f"Passive reply aborted due to API error: {e}")
|
|
return
|
|
|
|
self.handle_response(irc, msg, response, session=session)
|
|
|
|
def filter_prefix(self, msg, prefix):
|
|
if msg.startswith(prefix):
|
|
return msg[len(prefix):]
|
|
else:
|
|
return msg
|
|
|
|
def chat(self, irc, msg, args, string):
|
|
"""
|
|
<message>
|
|
|
|
Sends a message to ChatGPT and returns the response. The bot will include recent
|
|
conversation history from the channel to provide context.
|
|
|
|
Example:
|
|
@bot chat What is the capital of France?
|
|
"""
|
|
|
|
max_tokens = self.registryValue("max_tokens")
|
|
session = self._get_session(irc, msg.args[0])
|
|
now = time.time()
|
|
session.last_user_msg = now
|
|
session.last_invitation = now
|
|
session.thread_owner = msg.nick
|
|
if session.state == EngagementState.COOLING:
|
|
session.reset()
|
|
session.state = EngagementState.INVITED
|
|
session.replies_in_thread = 0
|
|
|
|
messages, _ = self._build_messages(irc, msg, string, include_current=False)
|
|
self.log.debug(f"API Request: {json.dumps(messages)}")
|
|
|
|
try:
|
|
response = self._post_chat_completion(messages, max_tokens=max_tokens)
|
|
except ValueError as e:
|
|
self.log.error(f"Configuration error: {e}")
|
|
irc.reply("The API key is not configured. Please set it before using this command.")
|
|
return
|
|
except requests.exceptions.Timeout:
|
|
self.log.error("Request timed out.")
|
|
irc.reply("The request to the API timed out. Please try again later.")
|
|
return
|
|
except requests.exceptions.HTTPError as e:
|
|
self.log.error(f"HTTP error: {e}")
|
|
irc.reply("An HTTP error occurred while contacting the API.")
|
|
return
|
|
except (requests.exceptions.RequestException, RuntimeError) as e:
|
|
self.log.error(f"Request exception: {e}")
|
|
irc.reply("An error occurred while contacting the API.")
|
|
return
|
|
|
|
self.handle_response(irc, msg, response, session=session)
|
|
self.log.info(f"Successfully processed request for user {msg.nick} in channel {msg.args[0]}")
|
|
|
|
chat = wrap(chat, ['text'])
|
|
|
|
Class = Chat
|