MarxBot/bot/dbmanager.py

154 lines
5.4 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import sqlite3
from typing import Optional, Dict
import discord
from bot.env_variable import path_sqldb
class DBManager:
def __init__(self):
# If the table "people_words" does not exists, MarxBot creates it.
table_peoplewords = """
CREATE TABLE IF NOT EXISTS people_words(
user_id VARCHAR(100) NOT NULL,
number_words INT,
delta_words INT,
PRIMARY KEY (user_id));
"""
table_citation = """
CREATE TABLE IF NOT EXISTS citations(
citation_author VARCHAR(25) NOT NULL,
citation_text VARCHAR(1000)
);
"""
table_birthday = """
CREATE TABLE IF NOT EXISTS birthday(
birthday_id VARCHAR(100) NOT NULL,
birthday_txt VARCHAR(100),
PRIMARY KEY (birthday_id)
);
"""
connexion = sqlite3.connect(path_sqldb)
connexion.execute(table_peoplewords)
connexion.execute(table_citation)
connexion.execute(table_birthday)
connexion.commit()
connexion.close()
async def add_numberword(self, message: discord.Message) -> None:
author_id = message.author.id
message_content = message.content
request = f"SELECT number_words, delta_words FROM people_words WHERE user_id={author_id};"
connexion = sqlite3.connect(path_sqldb)
cursor = connexion.cursor()
# Get the previous number of words from the author and update it.
result = cursor.execute(request).fetchone()
# If the author has not been registered in the table "people_words".
if result is None:
author_numberwords = len(message_content.split())
request = f"INSERT INTO people_words VALUES ({author_id}, {author_numberwords}, {author_numberwords});"
# If the author is already registered in the table "people_words".
else:
author_numberwords = result[0] + len(message_content.split())
author_deltawords = result[1] + len(message_content.split())
request = f"UPDATE people_words SET number_words={author_numberwords}, delta_words={author_deltawords}" \
f" WHERE user_id={author_id};"
cursor.execute(request)
connexion.commit()
connexion.close()
return
def get_numberwords(self, guild: discord.Guild) -> str:
request = "SELECT user_id, number_words, delta_words FROM people_words ORDER BY number_words DESC;"
numberwords_str = "```\nStatistique du nombre de mots écrits:\n"
connexion = sqlite3.connect(path_sqldb)
cursor = connexion.cursor()
for user_id, number_words, delta_words in cursor.execute(request).fetchall():
member = guild.get_member(int(user_id))
if member is not None:
numberwords_str += f"\t· {member.display_name} -- {number_words} mots (+{delta_words}).\n"
# Once the output is created, we reset the delta words.
request = "UPDATE people_words SET delta_words=0;"
connexion.execute(request)
connexion.commit()
connexion.close()
numberwords_str += "```"
return numberwords_str
def get_citation(self, author: Optional[str]) -> str:
# Request to have one random entry in the table "citation".
author_option = ""
if author is not None:
author_option = f"WHERE citation_author == \"{author}\""
request = "SELECT citation_author, citation_text FROM citations WHERE citation_text IN" + \
f"(SELECT citation_text FROM citations {author_option} ORDER BY RANDOM() LIMIT 1);"
connexion = sqlite3.connect(path_sqldb)
cursor = connexion.cursor()
result = cursor.execute(request).fetchone()
if result is None:
citation_str = "Il n'y a pas de citation qui correspond à votre demande :c"
else:
citation_str = f"« {result[1]} » - {result[0]}"
connexion.commit()
connexion.close()
return citation_str
def add_birthday(self, birthday_id: str, birthday_date: str) -> None:
connexion = sqlite3.connect(path_sqldb)
cursor = connexion.cursor()
exist = cursor.execute("SELECT * FROM birthday WHERE birthday_id=?", (birthday_id,)).fetchone()
if exist is None:
cursor.execute("INSERT INTO birthday VALUES (?, ?)", (birthday_id, birthday_date))
else:
cursor.execute("UPDATE birthday SET birthday_txt=? WHERE birthday_id=?", (birthday_date, birthday_id))
connexion.commit()
connexion.close()
def delete_birthday(self, birthday_id: str) -> None:
connexion = sqlite3.connect(path_sqldb)
cursor = connexion.cursor()
cursor.execute("DELETE FROM birthday WHERE birthday_id=?", (birthday_id,))
connexion.commit()
connexion.close()
def get_all_birthday(self) -> Dict[str, str]:
request = "SELECT birthday_id, birthday_txt FROM birthday ORDER BY birthday_id;"
birthday: Dict[str, str] = {}
connexion = sqlite3.connect(path_sqldb)
cursor = connexion.cursor()
for birthday_id, birthday_txt in cursor.execute(request).fetchall():
birthday[birthday_id] = birthday_txt
connexion.close()
return birthday