#!/usr/bin/env python3 # -*- coding: utf-8 -*- # System packages import sqlite3 from pathlib import Path from typing import Optional, Dict # PyPi packages import discord def initialize(path_sqldb: Path) -> None: """Initialize (if needed) the database needed by the bot. :param path_sqldb: Path where is the SQLite3 DB. :return: None """ # If the table "people_words" does not exist, MarxBot creates it. table_peoplewords = """ CREATE TABLE IF NOT EXISTS people_words( user_id VARCHAR(100) NOT NULL, number_words INT, delta_words INT, PRIMARY KEY (user_id)); """ table_citation = """ CREATE TABLE IF NOT EXISTS citations( citation_author VARCHAR(25) NOT NULL, citation_text VARCHAR(1000) ); """ table_birthday = """ CREATE TABLE IF NOT EXISTS birthday( birthday_id VARCHAR(100) NOT NULL, birthday_txt VARCHAR(100), PRIMARY KEY (birthday_id) ); """ connexion = sqlite3.connect(path_sqldb) connexion.execute(table_peoplewords) connexion.execute(table_citation) connexion.execute(table_birthday) connexion.commit() connexion.close() async def add_numberword(path_sqldb: Path, message: discord.Message) -> None: """Add to the count of words of the message's author the number of words written in the message. :param path_sqldb: Path where is the SQLite3 DB. :param message: Message written by the person. :return: None """ author_id = message.author.id message_content = message.content request = f"SELECT number_words, delta_words FROM people_words WHERE user_id={author_id};" connexion = sqlite3.connect(path_sqldb) cursor = connexion.cursor() # Get the previous number of words from the author and update it. result = cursor.execute(request).fetchone() # If the author has not been registered in the table "people_words". if result is None: author_numberwords = len(message_content.split()) request = f"INSERT INTO people_words VALUES ({author_id}, {author_numberwords}, {author_numberwords});" # If the author is already registered in the table "people_words". else: author_numberwords = result[0] + len(message_content.split()) author_deltawords = result[1] + len(message_content.split()) request = f"UPDATE people_words SET number_words={author_numberwords}, delta_words={author_deltawords}" \ f" WHERE user_id={author_id};" cursor.execute(request) connexion.commit() connexion.close() return def get_numberwords(path_sqldb: Path, guild: discord.Guild) -> str: """Get the number of words that have been said by people, resets the delta number of words since the last time. :param path_sqldb: Path where is the SQLite3 DB. :param guild: The server in which are the people, to get their pseudonyms. :return: """ request = "SELECT user_id, number_words, delta_words FROM people_words ORDER BY number_words DESC;" numberwords_str = "```\nStatistique du nombre de mots écrits:\n" connexion = sqlite3.connect(path_sqldb) cursor = connexion.cursor() for user_id, number_words, delta_words in cursor.execute(request).fetchall(): member = guild.get_member(int(user_id)) if member is not None: numberwords_str += f"\t· {member.display_name} -- {number_words} mots (+{delta_words}).\n" # Once the output is created, we reset the delta words. request = "UPDATE people_words SET delta_words=0;" connexion.execute(request) connexion.commit() connexion.close() numberwords_str += "```" return numberwords_str def get_citation(path_sqldb: Path, author: Optional[str]) -> str: """Request to have one random entry in the table "citation". :param path_sqldb: Path where is the SQLite3 DB. :param author: Optional name of the author of the citation requested. :return: A random citation. """ assert author is None or author in ["Karl Marx", "Kadoc"] author_option = "" if author is not None: author_option = f"WHERE citation_author == \"{author}\"" request = "SELECT citation_author, citation_text FROM citations WHERE citation_text IN" + \ f"(SELECT citation_text FROM citations {author_option} ORDER BY RANDOM() LIMIT 1);" connexion = sqlite3.connect(path_sqldb) cursor = connexion.cursor() result = cursor.execute(request).fetchone() if result is None: citation_str = "Il n'y a pas de citation qui correspond à votre demande :c" else: citation_str = f"« {result[1]} » - {result[0]}" connexion.commit() connexion.close() return citation_str def add_birthday(path_sqldb: Path, birthday_id: str, birthday_date: str) -> None: """Add the birthdate of someone. :param path_sqldb: Path where is the SQLite3 DB. :param birthday_id: ID of the person. :param birthday_date: Birthdate of the person. :return: None """ connexion = sqlite3.connect(path_sqldb) cursor = connexion.cursor() exist = cursor.execute("SELECT * FROM birthday WHERE birthday_id=?", (birthday_id,)).fetchone() if exist is None: cursor.execute("INSERT INTO birthday VALUES (?, ?)", (birthday_id, birthday_date)) else: cursor.execute("UPDATE birthday SET birthday_txt=? WHERE birthday_id=?", (birthday_date, birthday_id)) connexion.commit() connexion.close() def delete_birthday(path_sqldb: Path, birthday_id: str) -> None: """Delete the birthday information of someone from the DB. :param path_sqldb: Path where is the SQLite3 DB. :param birthday_id: ID of the person. :return: None """ connexion = sqlite3.connect(path_sqldb) cursor = connexion.cursor() cursor.execute("DELETE FROM birthday WHERE birthday_id=?", (birthday_id,)) connexion.commit() connexion.close() def get_all_birthday(path_sqldb: Path) -> Dict[str, str]: """Prints all the birthday in the DB. :param path_sqldb: Path where is the SQLite3 DB. :param guild: The server in which are the people, to get their pseudonyms. :return: None """ request = "SELECT birthday_id, birthday_txt FROM birthday ORDER BY birthday_id;" birthday: Dict[str, str] = {} connexion = sqlite3.connect(path_sqldb) cursor = connexion.cursor() for user_id, birthday_txt in cursor.execute(request).fetchall(): birthday[user_id] = birthday_txt connexion.close() return birthday