from .errors import *
import aiohttp
from aiohttp.client_exceptions import ClientConnectionError
import asyncio
import ipaddress
import atexit
import logging
import time
logger = logging.getLogger(__name__)
logging.basicConfig(filename="axterdb.log", filemode="w", level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
[docs]class Client():
def __init__(self, *, name: str, key: str, host: str, show_keys: bool = False):
self.name: str = name
self.key: str = key
self.host: str = host
self.show_keys: bool = show_keys
self.latency: float
self._headers: dict = {"KEY": f"{self.key}"}
self._session: aiohttp.ClientSession = None
self._connected: bool = False
self._accepted_types = ["TEXT", "INT", "REAL", "NULL"]
[docs] async def connect(self):
"""|coro|
Connects to the database, checks if everything is correct.
"""
logger.log(logging.INFO, "Connection to database started.")
self._session = aiohttp.ClientSession()
await self._check_instance()
await self._check_access()
atexit.register(self._close)
self._connected = True
logger.log(logging.INFO, "Database connection established")
def route(self, path: str = "") -> str:
return f"http://{self.host}{path}"
async def _check_instance(self) -> None:
"""|coro|
Check if the IP specified is a correct instance
Raises
------
InvalidInstanceIP
The IP provided is not an actual IP
ConnectionFailure
Connection to the instance failed
AlreadyConnected
Already connected to the specified database.
"""
logger.log(logging.INFO, "Checking instance.")
if self._connected:
raise AlreadyConnected(self.host, self.key, self.name, self.show_keys)
try:
ip = self.host.split(":")[0]
ipaddress.ip_address(ip)
except ValueError:
raise InvalidInstanceIP(self.host)
try:
async with self._session.get(self.route("/")) as response:
if response.status != 200:
raise ConnectionFailure(self.host, self.key, self.show_keys)
except ClientConnectionError as e:
raise ConnectionFailure(self.host, self.key, self.show_keys)
logger.log(logging.INFO, "Instance checked")
async def _check_access(self) -> None:
"""|coro|
Checks if the specified key has access to the specified database.
Raises
------
NoAccess
Key specified does not have access to the specified database
AlreadyConnected
Already connected to the specified database.
"""
logger.log(logging.INFO, f"Checking access for key {self.key if self.show_keys else '[HIDDEN]'} to {self.name}")
if self._connected:
raise AlreadyConnected(self.host, self.key, self.name, self.show_keys)
start_time = time.time()
async with self._session.get(self.route(f"/me"), headers=self._headers) as response:
end_time = time.time()
self.latency: float = end_time - start_time
if response.status == 200:
data = await response.json()
databases = data["detail"]["data"]["Databases"]
if self.name not in databases:
raise NoAccess(self.host, self.key, self.name, self.show_keys)
else:
print(f"Connected to {self.name}! (Instance: {self.host} | Key: {self.key if self.show_keys else '[HIDDEN]'})")
else:
raise InvalidKey()
[docs] async def create_table(self, table: str, **columns) -> None:
"""|coro|
Creates a table on the database
Parameters
----------
table: :class:`str`
The table name to create.
**columns
Additional arguments are used as columns for the table.
Returns
-------
:class:`bool`
Returns `True` if function was executed sucessfully.
Raises
------
NotConnected
Not connected to the database.
UnAcceptedType
Invalid row type.
InvalidTable
Table not provided.
InvalidRows
Rows not provided.
TableAlreadyExists
Table with that name already exists.
"""
if not self._connected:
raise NotConnected()
columns_dict = columns
for key in columns_dict:
if columns_dict[key].upper() not in self._accepted_types:
raise UnAcceptedType(columns_dict[key].upper())
async with self._session.post(self.route(f"/database/{self.name}/create?table={table}"), headers=self._headers, json=columns_dict) as response:
if response.status == 200:
return True
elif response.status == 401:
raise InvalidKey()
elif response.status == 422:
data = await response.json()
error: str = data["detail"]["message"]
if "table" in error:
raise InvalidTable()
elif "rows" in error:
raise InvalidRows()
raise UnknownError(response.status)
elif response.status == 409:
raise TableAlreadyExists(table)
else:
raise UnknownError(response.status)
[docs] async def get(self, table: str, amount: str = None, **kwargs) -> list:
"""|coro|
Get data from a table
Parameters
----------
table: :class:`str`
The table to get data from.
amount: :class:`str`
The amount of data to get.
**kwargs
Additional arguments are used as condition arguments (WHERE in SQL)
Returns
-------
:class:`list`
Returns a list of lists containing data, or an empty list if no data was found.
Raises
------
NotConnected
Not connected to the database.
"""
if not self._connected:
raise NotConnected()
data = kwargs
headers = self._headers
headers["table"] = table
if amount: headers["amount"] = str(amount)
async with self._session.get(self.route(f"/database/{self.name}/select"), headers=headers, json=data) as response:
if response.status == 200:
data = await response.json()
rows = data["detail"]["rows"]
return rows
elif response.status == 422:
data = await response.json()
error = data["detail"]["message"]
table_name = error.replace("No column found with the name ", "")
raise InvalidColumn(table_name)
raise UnknownError(response.status)
# TODO: Add errors for this
[docs] async def insert(self, table: str, **data) -> bool:
"""|coro|
Insert data into a table
Parameters
----------
table: :class:`str`
The table to insert data to.
**data
Additional arguments are used as data to insert.
Returns
-------
:class:`bool`
Returns True if query executed sucessfully.
Raises
------
NotConnected
Not connected to the database.
InvalidColumn
Invalid column provided.
"""
if not self._connected:
raise NotConnected()
data = data
headers = self._headers
headers["table"] = table
async with self._session.get(self.route(f"/database/{self.name}/insert"), headers=headers, json=data) as response:
if response.status == 200:
return True
elif response.status == 422:
data = await response.json()
raise InvalidColumn(data["detail"].split(' ').pop(0))
raise UnknownError(response.status)
# TODO: Add errors from status codes.
[docs] async def delete(self, table: str, **data) -> bool:
"""|coro|
Deletes data from a table
Parameters
----------
table: :class:`str`
The table to delete data from.
**data
Additional arguments are used as data to delete.
Returns
-------
:class:`bool`
Returns True if query executed sucessfully.
Raises
------
NotConnected
Not connected to the database.
"""
if not self._connected:
raise NotConnected()
data = data
headers = self._headers
headers["table"] = table
async with self._session.get(self.route(f"/database/{self.name}/delete"), headers=headers, json=data) as response:
if response.status == 200:
return True
raise UnknownError(response.status)
[docs] async def delete_table(self, table: str) -> bool:
"""|coro|
Deletes a table
Parameters
----------
table: :class:`str`
The table to delete.
Returns
-------
:class:`bool`
Returns True if query executed sucessfully.
Raises
------
NotConnected
Not connected to the database.
"""
if not self._connected:
raise NotConnected()
headers = self._headers
headers["table"] = table
async with self._session.get(self.route(f"/database/{self.name}/delete_table"), headers=headers) as response:
if response.status == 200:
return True
raise UnknownError(response.status)
[docs] async def get_all_tables(self) -> None:
"""|coro|
Get all tables of the database
Returns
-------
:class:`list`
Returns a list of table names
Raises
------
NotConnected
Not connected to the database.
InvalidKey
Key is invalid.
"""
if not self._connected:
raise NotConnected()
async with self._session.get(self.route(f"/database/{self.name}/get"), headers=self._headers) as response:
if response.status == 200:
data = await response.json()
tables = data["detail"]["tables"]
return tables
elif response.status == 401:
raise InvalidKey()
raise UnknownError(response.status)
# TODO: Add errors from status codes.
[docs] async def check_table(self, table: str) -> None:
"""|coro|
Check if a table exists
Parameters
----------
table: :class:`str`
Returns
-------
:class:`bool`
Returns True if table exists, else False
Raises
------
NotConnected
Not connected to the database.
InvalidKey
Key is invalid.
"""
if not self._connected:
raise NotConnected()
async with self._session.get(self.route(f"/database/{self.name}/get?table={table}"), headers=self._headers) as response:
if response.status == 200:
return True
elif response.status == 404:
return False
elif response.status == 401:
raise InvalidKey()
raise UnknownError(response.status)
# TODO: Add errors from status codes.
def _close(self) -> None:
asyncio.run(self._session.close())
[docs]class AdminClient(Client):
def __init__(self, *, name: str, key: str, host: str, show_keys: bool = False):
super().__init__(name=name, key=key, host=host, show_keys=show_keys)
[docs] async def create_user(self, name: str, admin: bool = False) -> str:
"""|coro|
Create a user.
Returns
-------
:class:`str`
Generated key for the created user.
Raises
------
NotConnected
Not connected to the database.
"""
if not self._connected:
raise NotConnected()
admin = int(admin)
async with self._session.post(self.route(f"/admin/keys/create?name={name}&admin={admin}"), headers=self._headers) as response:
if response.status == 200:
data = await response.json()
key = data["detail"]["data"]["key"]
return key
raise UnknownError(response.status)
[docs] async def delete_user(self, key: str) -> True:
"""|coro|
Delete a user.
Returns
-------
:class:`bool`
Returns `True` if user was deleted sucessfully.
Raises
------
NotConnected
Not connected to the database.
"""
if not self._connected:
raise NotConnected()
async with self._session.post(self.route(f"/admin/keys/delete?key={key}"), headers=self._headers) as response:
if response.status == 200:
return True
raise UnknownError(response.status)