From d8986643de315691d7ef3b43ac184698ac7690ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Abiga=C3=ABlle=20Martin?= Date: Fri, 12 Sep 2025 10:10:08 +0200 Subject: [PATCH] Bot: Refactored DB management. --- bot/dbmanager.py | 268 ++++++++++++++++++++++++++------------------ bot/env_variable.py | 7 -- bot/main.py | 32 ++++-- bot/marxbot.py | 41 ++++--- 4 files changed, 207 insertions(+), 141 deletions(-) delete mode 100644 bot/env_variable.py diff --git a/bot/dbmanager.py b/bot/dbmanager.py index 88c10de..62000e6 100644 --- a/bot/dbmanager.py +++ b/bot/dbmanager.py @@ -1,153 +1,199 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import logging +# System packages import sqlite3 +from pathlib import Path from typing import Optional, Dict +# PyPi packages import discord -from bot.env_variable import path_sqldb + +def initialize(path_sqldb: Path) -> None: + """Initialize (if needed) the database needed by the bot. + + :param path_sqldb: Path where is the SQLite3 DB. + :return: None + """ + # If the table "people_words" does not exist, MarxBot creates it. + table_peoplewords = """ + CREATE TABLE IF NOT EXISTS people_words( + user_id VARCHAR(100) NOT NULL, + number_words INT, + delta_words INT, + PRIMARY KEY (user_id)); + """ + table_citation = """ + CREATE TABLE IF NOT EXISTS citations( + citation_author VARCHAR(25) NOT NULL, + citation_text VARCHAR(1000) + ); + """ + table_birthday = """ + CREATE TABLE IF NOT EXISTS birthday( + birthday_id VARCHAR(100) NOT NULL, + birthday_txt VARCHAR(100), + PRIMARY KEY (birthday_id) + ); + """ + + connexion = sqlite3.connect(path_sqldb) + connexion.execute(table_peoplewords) + connexion.execute(table_citation) + connexion.execute(table_birthday) + connexion.commit() + connexion.close() -class DBManager: +async def add_numberword(path_sqldb: Path, message: discord.Message) -> None: + """Add to the count of words of the message's author the number of words written in the message. - def __init__(self): - # If the table "people_words" does not exists, MarxBot creates it. - table_peoplewords = """ - CREATE TABLE IF NOT EXISTS people_words( - user_id VARCHAR(100) NOT NULL, - number_words INT, - delta_words INT, - PRIMARY KEY (user_id)); - """ - table_citation = """ - CREATE TABLE IF NOT EXISTS citations( - citation_author VARCHAR(25) NOT NULL, - citation_text VARCHAR(1000) - ); - """ - table_birthday = """ - CREATE TABLE IF NOT EXISTS birthday( - birthday_id VARCHAR(100) NOT NULL, - birthday_txt VARCHAR(100), - PRIMARY KEY (birthday_id) - ); - """ + :param path_sqldb: Path where is the SQLite3 DB. + :param message: Message written by the person. + :return: None + """ + author_id = message.author.id + message_content = message.content + request = f"SELECT number_words, delta_words FROM people_words WHERE user_id={author_id};" - connexion = sqlite3.connect(path_sqldb) - connexion.execute(table_peoplewords) - connexion.execute(table_citation) - connexion.execute(table_birthday) - connexion.commit() - connexion.close() + connexion = sqlite3.connect(path_sqldb) + cursor = connexion.cursor() - async def add_numberword(self, message: discord.Message) -> None: - author_id = message.author.id - message_content = message.content - request = f"SELECT number_words, delta_words FROM people_words WHERE user_id={author_id};" + # Get the previous number of words from the author and update it. + result = cursor.execute(request).fetchone() - connexion = sqlite3.connect(path_sqldb) - cursor = connexion.cursor() + # If the author has not been registered in the table "people_words". + if result is None: + author_numberwords = len(message_content.split()) + request = f"INSERT INTO people_words VALUES ({author_id}, {author_numberwords}, {author_numberwords});" - # Get the previous number of words from the author and update it. - result = cursor.execute(request).fetchone() + # If the author is already registered in the table "people_words". + else: + author_numberwords = result[0] + len(message_content.split()) + author_deltawords = result[1] + len(message_content.split()) + request = f"UPDATE people_words SET number_words={author_numberwords}, delta_words={author_deltawords}" \ + f" WHERE user_id={author_id};" - # If the author has not been registered in the table "people_words". - if result is None: - author_numberwords = len(message_content.split()) - request = f"INSERT INTO people_words VALUES ({author_id}, {author_numberwords}, {author_numberwords});" + cursor.execute(request) - # If the author is already registered in the table "people_words". - else: - author_numberwords = result[0] + len(message_content.split()) - author_deltawords = result[1] + len(message_content.split()) - request = f"UPDATE people_words SET number_words={author_numberwords}, delta_words={author_deltawords}" \ - f" WHERE user_id={author_id};" + connexion.commit() + connexion.close() - cursor.execute(request) + return - connexion.commit() - connexion.close() - return +def get_numberwords(path_sqldb: Path, guild: discord.Guild) -> str: + """Get the number of words that have been said by people, resets the delta number of words since the last time. - def get_numberwords(self, guild: discord.Guild) -> str: - request = "SELECT user_id, number_words, delta_words FROM people_words ORDER BY number_words DESC;" - numberwords_str = "```\nStatistique du nombre de mots écrits:\n" + :param path_sqldb: Path where is the SQLite3 DB. + :param guild: The server in which are the people, to get their pseudonyms. + :return: + """ + request = "SELECT user_id, number_words, delta_words FROM people_words ORDER BY number_words DESC;" + numberwords_str = "```\nStatistique du nombre de mots écrits:\n" - connexion = sqlite3.connect(path_sqldb) - cursor = connexion.cursor() + connexion = sqlite3.connect(path_sqldb) + cursor = connexion.cursor() - for user_id, number_words, delta_words in cursor.execute(request).fetchall(): - member = guild.get_member(int(user_id)) - if member is not None: - numberwords_str += f"\t· {member.display_name} -- {number_words} mots (+{delta_words}).\n" + for user_id, number_words, delta_words in cursor.execute(request).fetchall(): + member = guild.get_member(int(user_id)) + if member is not None: + numberwords_str += f"\t· {member.display_name} -- {number_words} mots (+{delta_words}).\n" - # Once the output is created, we reset the delta words. - request = "UPDATE people_words SET delta_words=0;" - connexion.execute(request) + # Once the output is created, we reset the delta words. + request = "UPDATE people_words SET delta_words=0;" + connexion.execute(request) - connexion.commit() - connexion.close() + connexion.commit() + connexion.close() - numberwords_str += "```" - return numberwords_str + numberwords_str += "```" + return numberwords_str - def get_citation(self, author: Optional[str]) -> str: - # Request to have one random entry in the table "citation". - author_option = "" - if author is not None: - author_option = f"WHERE citation_author == \"{author}\"" +def get_citation(path_sqldb: Path, author: Optional[str]) -> str: + """Request to have one random entry in the table "citation". - request = "SELECT citation_author, citation_text FROM citations WHERE citation_text IN" + \ - f"(SELECT citation_text FROM citations {author_option} ORDER BY RANDOM() LIMIT 1);" + :param path_sqldb: Path where is the SQLite3 DB. + :param author: Optional name of the author of the citation requested. + :return: A random citation. + """ + assert author is None or author in ["Karl Marx", "Kadoc"] - connexion = sqlite3.connect(path_sqldb) - cursor = connexion.cursor() + author_option = "" + if author is not None: + author_option = f"WHERE citation_author == \"{author}\"" - result = cursor.execute(request).fetchone() - if result is None: - citation_str = "Il n'y a pas de citation qui correspond à votre demande :c" - else: - citation_str = f"« {result[1]} » - {result[0]}" + request = "SELECT citation_author, citation_text FROM citations WHERE citation_text IN" + \ + f"(SELECT citation_text FROM citations {author_option} ORDER BY RANDOM() LIMIT 1);" - connexion.commit() - connexion.close() + connexion = sqlite3.connect(path_sqldb) + cursor = connexion.cursor() - return citation_str + result = cursor.execute(request).fetchone() + if result is None: + citation_str = "Il n'y a pas de citation qui correspond à votre demande :c" + else: + citation_str = f"« {result[1]} » - {result[0]}" - def add_birthday(self, birthday_id: str, birthday_date: str) -> None: - connexion = sqlite3.connect(path_sqldb) - cursor = connexion.cursor() + connexion.commit() + connexion.close() - exist = cursor.execute("SELECT * FROM birthday WHERE birthday_id=?", (birthday_id,)).fetchone() - if exist is None: - cursor.execute("INSERT INTO birthday VALUES (?, ?)", (birthday_id, birthday_date)) - else: - cursor.execute("UPDATE birthday SET birthday_txt=? WHERE birthday_id=?", (birthday_date, birthday_id)) + return citation_str - connexion.commit() - connexion.close() - def delete_birthday(self, birthday_id: str) -> None: - connexion = sqlite3.connect(path_sqldb) - cursor = connexion.cursor() +def add_birthday(path_sqldb: Path, birthday_id: str, birthday_date: str) -> None: + """Add the birthdate of someone. - cursor.execute("DELETE FROM birthday WHERE birthday_id=?", (birthday_id,)) + :param path_sqldb: Path where is the SQLite3 DB. + :param birthday_id: ID of the person. + :param birthday_date: Birthdate of the person. + :return: None + """ + connexion = sqlite3.connect(path_sqldb) + cursor = connexion.cursor() - connexion.commit() - connexion.close() + exist = cursor.execute("SELECT * FROM birthday WHERE birthday_id=?", (birthday_id,)).fetchone() + if exist is None: + cursor.execute("INSERT INTO birthday VALUES (?, ?)", (birthday_id, birthday_date)) + else: + cursor.execute("UPDATE birthday SET birthday_txt=? WHERE birthday_id=?", (birthday_date, birthday_id)) - def get_all_birthday(self) -> Dict[str, str]: - request = "SELECT birthday_id, birthday_txt FROM birthday ORDER BY birthday_id;" - birthday: Dict[str, str] = {} + connexion.commit() + connexion.close() - connexion = sqlite3.connect(path_sqldb) - cursor = connexion.cursor() - for birthday_id, birthday_txt in cursor.execute(request).fetchall(): - birthday[birthday_id] = birthday_txt - connexion.close() - return birthday +def delete_birthday(path_sqldb: Path, birthday_id: str) -> None: + """Delete the birthday information of someone from the DB. + + :param path_sqldb: Path where is the SQLite3 DB. + :param birthday_id: ID of the person. + :return: None + """ + connexion = sqlite3.connect(path_sqldb) + cursor = connexion.cursor() + + cursor.execute("DELETE FROM birthday WHERE birthday_id=?", (birthday_id,)) + + connexion.commit() + connexion.close() + + +def get_all_birthday(path_sqldb: Path) -> Dict[str, str]: + """Prints all the birthday in the DB. + + :param path_sqldb: Path where is the SQLite3 DB. + :param guild: The server in which are the people, to get their pseudonyms. + :return: None + """ + request = "SELECT birthday_id, birthday_txt FROM birthday ORDER BY birthday_id;" + birthday: Dict[str, str] = {} + + connexion = sqlite3.connect(path_sqldb) + cursor = connexion.cursor() + for user_id, birthday_txt in cursor.execute(request).fetchall(): + birthday[user_id] = birthday_txt + connexion.close() + + return birthday diff --git a/bot/env_variable.py b/bot/env_variable.py deleted file mode 100644 index 469fbe1..0000000 --- a/bot/env_variable.py +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - - -path_project = "/home/toshuumilia/marxbot/" -path_sqldb = path_project + "marxbot.sqlite" -path_log = path_project + "marxbot.log" diff --git a/bot/main.py b/bot/main.py index 10a3bac..c16ae99 100644 --- a/bot/main.py +++ b/bot/main.py @@ -1,16 +1,26 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +# System packages +import json import logging import sys +from pathlib import Path +from typing import Dict, Any +# PyPi packages import discord -from bot.env_variable import path_log +# Project packages from bot.marxbot import MarxBot -if __name__ == '__main__': - # Activating logging + +def setup_logger(path_log: Path) -> None: + """Setup of the logger to keep logs in the given file. + + :param path_log: Path to file where the logs should be kept. + :return: None + """ logging.basicConfig(filename=path_log, filemode="a+", level=logging.INFO, format="[%(asctime)s][%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S") handler = logging.StreamHandler(sys.stdout) @@ -18,14 +28,22 @@ if __name__ == '__main__': handler.setFormatter(logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s')) logging.getLogger().addHandler(handler) - # Launching MarxBot + +if __name__ == '__main__': + # Load the bot configuration. + with open(Path("configuration.json"), "r") as file_config: + botconfig: Dict[str, Any] = json.loads(file_config.read()) + + # Activate the logs + setup_logger(Path(botconfig["path_root"]) / botconfig["path_log"]) + + # Launch MarxBot logging.info("MarxBot is launching.") - token = sys.argv[1] intents = discord.Intents.all() - client = MarxBot(dev_marx=False, intents=intents) + client = MarxBot(botconfig=botconfig, intents=intents) try: - client.run(token) + client.run(botconfig["token"]) except Exception: logging.error("Something went bad !", exc_info=True) raise diff --git a/bot/marxbot.py b/bot/marxbot.py index 3309f57..f2f8c7d 100644 --- a/bot/marxbot.py +++ b/bot/marxbot.py @@ -1,19 +1,29 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +# System packages import logging +from pathlib import Path +from typing import Dict, Any +# PyPi packages import discord -from bot.dbmanager import DBManager +import bot.dbmanager as dbmanager class MarxBot(discord.Client): - def __init__(self, dev_marx: bool = False, *args, **kwargs): + def __init__(self, botconfig: Dict[str, Any], *args, **kwargs): super().__init__(*args, **kwargs) - self.dbmanager = DBManager() + # Paths for various information. + self.path_root: Path = Path(botconfig["path_root"]) + self.path_sqldb: Path = self.path_root / botconfig["path_sqldb"] + + # Prefix of every command to call the bot. + self.command = botconfig["command"] + self.subcommand = { "cite": { "--marx": "Karl Marx", @@ -21,18 +31,17 @@ class MarxBot(discord.Client): } } - self.command = "!marx" - if dev_marx: - self.command = "!dev-marx" - - # Server Tea-Tanches or Obi'lia + # Answer requests from these servers only (Tea-Tanches or Obi'lia) self.authorized_servers = [93251981056970752, 363070323412697088] + # Initialize the DB. + dbmanager.initialize(self.path_sqldb) + async def on_ready(self): logging.info(f"MarxBot is ready as {self.user}") async def on_message(self, message): - # We don't want respond to ourselves + # Avoid responding to ourselves if message.author == self.user: return @@ -41,13 +50,13 @@ class MarxBot(discord.Client): msg_guild = message.guild msg_author = message.author - # If the bot is called from an unauthorized server: + # If called from an unauthorized server: if msg_guild is None or msg_guild.id not in self.authorized_servers: return # We record the number of words from each member coming from the guild "Les Tea-Tanches". if msg_guild is not None and msg_guild.id == 93251981056970752: - await self.dbmanager.add_numberword(message) + await dbmanager.add_numberword(self.path_sqldb, message) # The bot is not called by someone. if not msg_txt.startswith(self.command): @@ -106,7 +115,7 @@ class MarxBot(discord.Client): async def send_msg_stats(self, msg_channel: discord.TextChannel, msg_guild: discord.Guild): # Server Tea-Tanches or Obi'lia if msg_guild.id == 93251981056970752 or msg_guild.id == 363070323412697088: - await msg_channel.send(self.dbmanager.get_numberwords(msg_guild)) + await msg_channel.send(dbmanager.get_numberwords(self.path_sqldb, msg_guild)) async def send_msg_citation(self, msg_channel: discord.TextChannel, msg_txt: str): options = msg_txt.split() @@ -116,7 +125,7 @@ class MarxBot(discord.Client): elif "--kadoc" in options: author = self.subcommand.get("cite").get("--kadoc") - await msg_channel.send(self.dbmanager.get_citation(author)) + await msg_channel.send(dbmanager.get_citation(self.path_sqldb, author)) async def send_msg_music(self, msg_channel: discord.TextChannel): await msg_channel.send(":notes: https://www.youtube.com/watch?v=wKDD1H-Hlpc :musical_note:") @@ -130,13 +139,13 @@ class MarxBot(discord.Client): logging.warning(f"No --add or --ajouter parameter found: {msg_txt}") return - self.dbmanager.add_birthday(msg_author.id, birthday_txt) + dbmanager.add_birthday(self.path_sqldb, str(msg_author.id), birthday_txt) def delete_birthday(self, msg_author: discord.Member): - self.dbmanager.delete_birthday(msg_author.id) + dbmanager.delete_birthday(self.path_sqldb, str(msg_author.id)) async def send_msg_birthday(self, msg_channel: discord.TextChannel, msg_guild: discord.Guild): - birthday = self.dbmanager.get_all_birthday() + birthday = dbmanager.get_all_birthday(self.path_sqldb) birthday_str = "```\nJours d'anniversaire :\n" for birthday_id, birthday_txt in birthday.items():