#!/usr/bin/env python3 # -*- coding: utf-8 -*- import logging import sqlite3 from typing import Optional, Dict import discord from bot.env_variable import path_sqldb class DBManager: def __init__(self): # If the table "people_words" does not exists, MarxBot creates it. table_peoplewords = """ CREATE TABLE IF NOT EXISTS people_words( user_id VARCHAR(100) NOT NULL, number_words INT, delta_words INT, PRIMARY KEY (user_id)); """ table_citation = """ CREATE TABLE IF NOT EXISTS citations( citation_author VARCHAR(25) NOT NULL, citation_text VARCHAR(1000) ); """ table_birthday = """ CREATE TABLE IF NOT EXISTS birthday( birthday_id VARCHAR(100) NOT NULL, birthday_txt VARCHAR(100), PRIMARY KEY (birthday_id) ); """ connexion = sqlite3.connect(path_sqldb) connexion.execute(table_peoplewords) connexion.execute(table_citation) connexion.execute(table_birthday) connexion.commit() connexion.close() async def add_numberword(self, message: discord.Message) -> None: author_id = message.author.id message_content = message.content request = f"SELECT number_words, delta_words FROM people_words WHERE user_id={author_id};" connexion = sqlite3.connect(path_sqldb) cursor = connexion.cursor() # Get the previous number of words from the author and update it. result = cursor.execute(request).fetchone() # If the author has not been registered in the table "people_words". if result is None: author_numberwords = len(message_content.split()) request = f"INSERT INTO people_words VALUES ({author_id}, {author_numberwords}, {author_numberwords});" # If the author is already registered in the table "people_words". else: author_numberwords = result[0] + len(message_content.split()) author_deltawords = result[1] + len(message_content.split()) request = f"UPDATE people_words SET number_words={author_numberwords}, delta_words={author_deltawords}" \ f" WHERE user_id={author_id};" cursor.execute(request) connexion.commit() connexion.close() return def get_numberwords(self, guild: discord.Guild) -> str: request = "SELECT user_id, number_words, delta_words FROM people_words ORDER BY number_words DESC;" numberwords_str = "```\nStatistique du nombre de mots écrits:\n" connexion = sqlite3.connect(path_sqldb) cursor = connexion.cursor() for user_id, number_words, delta_words in cursor.execute(request).fetchall(): member = guild.get_member(int(user_id)) if member is not None: numberwords_str += f"\t· {member.display_name} -- {number_words} mots (+{delta_words}).\n" # Once the output is created, we reset the delta words. request = "UPDATE people_words SET delta_words=0;" connexion.execute(request) connexion.commit() connexion.close() numberwords_str += "```" return numberwords_str def get_citation(self, author: Optional[str]) -> str: # Request to have one random entry in the table "citation". author_option = "" if author is not None: author_option = f"WHERE citation_author == \"{author}\"" request = "SELECT citation_author, citation_text FROM citations WHERE citation_text IN" + \ f"(SELECT citation_text FROM citations {author_option} ORDER BY RANDOM() LIMIT 1);" connexion = sqlite3.connect(path_sqldb) cursor = connexion.cursor() result = cursor.execute(request).fetchone() if result is None: citation_str = "Il n'y a pas de citation qui correspond à votre demande :c" else: citation_str = f"« {result[1]} » - {result[0]}" connexion.commit() connexion.close() return citation_str def add_birthday(self, birthday_id: str, birthday_date: str) -> None: connexion = sqlite3.connect(path_sqldb) cursor = connexion.cursor() exist = cursor.execute("SELECT * FROM birthday WHERE birthday_id=?", (birthday_id,)).fetchone() if exist is None: cursor.execute("INSERT INTO birthday VALUES (?, ?)", (birthday_id, birthday_date)) else: cursor.execute("UPDATE birthday SET birthday_txt=? WHERE birthday_id=?", (birthday_date, birthday_id)) connexion.commit() connexion.close() def delete_birthday(self, birthday_id: str) -> None: connexion = sqlite3.connect(path_sqldb) cursor = connexion.cursor() cursor.execute("DELETE FROM birthday WHERE birthday_id=?", (birthday_id,)) connexion.commit() connexion.close() def get_all_birthday(self) -> Dict[str, str]: request = "SELECT birthday_id, birthday_txt FROM birthday ORDER BY birthday_id;" birthday: Dict[str, str] = {} connexion = sqlite3.connect(path_sqldb) cursor = connexion.cursor() for birthday_id, birthday_txt in cursor.execute(request).fetchall(): birthday[birthday_id] = birthday_txt connexion.close() return birthday