MarxBot/bot/dbmanager.py

200 lines
6.5 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# System packages
import sqlite3
from pathlib import Path
from typing import Optional, Dict
# PyPi packages
import discord
def initialize(path_sqldb: Path) -> None:
"""Initialize (if needed) the database needed by the bot.
:param path_sqldb: Path where is the SQLite3 DB.
:return: None
"""
# If the table "people_words" does not exist, MarxBot creates it.
table_peoplewords = """
CREATE TABLE IF NOT EXISTS people_words(
user_id VARCHAR(100) NOT NULL,
number_words INT,
delta_words INT,
PRIMARY KEY (user_id));
"""
table_citation = """
CREATE TABLE IF NOT EXISTS citations(
citation_author VARCHAR(25) NOT NULL,
citation_text VARCHAR(1000)
);
"""
table_birthday = """
CREATE TABLE IF NOT EXISTS birthday(
birthday_id VARCHAR(100) NOT NULL,
birthday_txt VARCHAR(100),
PRIMARY KEY (birthday_id)
);
"""
connexion = sqlite3.connect(path_sqldb)
connexion.execute(table_peoplewords)
connexion.execute(table_citation)
connexion.execute(table_birthday)
connexion.commit()
connexion.close()
async def add_numberword(path_sqldb: Path, message: discord.Message) -> None:
"""Add to the count of words of the message's author the number of words written in the message.
:param path_sqldb: Path where is the SQLite3 DB.
:param message: Message written by the person.
:return: None
"""
author_id = message.author.id
message_content = message.content
request = f"SELECT number_words, delta_words FROM people_words WHERE user_id={author_id};"
connexion = sqlite3.connect(path_sqldb)
cursor = connexion.cursor()
# Get the previous number of words from the author and update it.
result = cursor.execute(request).fetchone()
# If the author has not been registered in the table "people_words".
if result is None:
author_numberwords = len(message_content.split())
request = f"INSERT INTO people_words VALUES ({author_id}, {author_numberwords}, {author_numberwords});"
# If the author is already registered in the table "people_words".
else:
author_numberwords = result[0] + len(message_content.split())
author_deltawords = result[1] + len(message_content.split())
request = f"UPDATE people_words SET number_words={author_numberwords}, delta_words={author_deltawords}" \
f" WHERE user_id={author_id};"
cursor.execute(request)
connexion.commit()
connexion.close()
return
def get_numberwords(path_sqldb: Path, guild: discord.Guild) -> str:
"""Get the number of words that have been said by people, resets the delta number of words since the last time.
:param path_sqldb: Path where is the SQLite3 DB.
:param guild: The server in which are the people, to get their pseudonyms.
:return:
"""
request = "SELECT user_id, number_words, delta_words FROM people_words ORDER BY number_words DESC;"
numberwords_str = "```\nStatistique du nombre de mots écrits:\n"
connexion = sqlite3.connect(path_sqldb)
cursor = connexion.cursor()
for user_id, number_words, delta_words in cursor.execute(request).fetchall():
member = guild.get_member(int(user_id))
if member is not None:
numberwords_str += f"\t· {member.display_name} -- {number_words} mots (+{delta_words}).\n"
# Once the output is created, we reset the delta words.
request = "UPDATE people_words SET delta_words=0;"
connexion.execute(request)
connexion.commit()
connexion.close()
numberwords_str += "```"
return numberwords_str
def get_citation(path_sqldb: Path, author: Optional[str]) -> str:
"""Request to have one random entry in the table "citation".
:param path_sqldb: Path where is the SQLite3 DB.
:param author: Optional name of the author of the citation requested.
:return: A random citation.
"""
assert author is None or author in ["Karl Marx", "Kadoc"]
author_option = ""
if author is not None:
author_option = f"WHERE citation_author == \"{author}\""
request = "SELECT citation_author, citation_text FROM citations WHERE citation_text IN" + \
f"(SELECT citation_text FROM citations {author_option} ORDER BY RANDOM() LIMIT 1);"
connexion = sqlite3.connect(path_sqldb)
cursor = connexion.cursor()
result = cursor.execute(request).fetchone()
if result is None:
citation_str = "Il n'y a pas de citation qui correspond à votre demande :c"
else:
citation_str = f"« {result[1]} » - {result[0]}"
connexion.commit()
connexion.close()
return citation_str
def add_birthday(path_sqldb: Path, birthday_id: str, birthday_date: str) -> None:
"""Add the birthdate of someone.
:param path_sqldb: Path where is the SQLite3 DB.
:param birthday_id: ID of the person.
:param birthday_date: Birthdate of the person.
:return: None
"""
connexion = sqlite3.connect(path_sqldb)
cursor = connexion.cursor()
exist = cursor.execute("SELECT * FROM birthday WHERE birthday_id=?", (birthday_id,)).fetchone()
if exist is None:
cursor.execute("INSERT INTO birthday VALUES (?, ?)", (birthday_id, birthday_date))
else:
cursor.execute("UPDATE birthday SET birthday_txt=? WHERE birthday_id=?", (birthday_date, birthday_id))
connexion.commit()
connexion.close()
def delete_birthday(path_sqldb: Path, birthday_id: str) -> None:
"""Delete the birthday information of someone from the DB.
:param path_sqldb: Path where is the SQLite3 DB.
:param birthday_id: ID of the person.
:return: None
"""
connexion = sqlite3.connect(path_sqldb)
cursor = connexion.cursor()
cursor.execute("DELETE FROM birthday WHERE birthday_id=?", (birthday_id,))
connexion.commit()
connexion.close()
def get_all_birthday(path_sqldb: Path) -> Dict[str, str]:
"""Prints all the birthday in the DB.
:param path_sqldb: Path where is the SQLite3 DB.
:param guild: The server in which are the people, to get their pseudonyms.
:return: None
"""
request = "SELECT birthday_id, birthday_txt FROM birthday ORDER BY birthday_id;"
birthday: Dict[str, str] = {}
connexion = sqlite3.connect(path_sqldb)
cursor = connexion.cursor()
for user_id, birthday_txt in cursor.execute(request).fetchall():
birthday[user_id] = birthday_txt
connexion.close()
return birthday