| |
@@ -1,8 +1,8 @@
|
| |
import abc
|
| |
- import datetime
|
| |
+ from datetime import datetime
|
| |
from dataclasses import dataclass
|
| |
from enum import Enum
|
| |
- from typing import Tuple
|
| |
+ from typing import Tuple, Union
|
| |
from urllib.parse import urljoin
|
| |
|
| |
import pandas as pd
|
| |
@@ -13,12 +13,12 @@
|
| |
|
| |
@dataclass
|
| |
class DatasetMeta:
|
| |
- filepath: str = ""
|
| |
- updated_at: datetime.datetime = None
|
| |
+ filepath: str
|
| |
+ updated_at: datetime
|
| |
|
| |
def __post_init__(self):
|
| |
- if self.updated_at is not None:
|
| |
- self.updated_at = datetime.datetime.fromisoformat(self.updated_at)
|
| |
+ if type(self.updated_at) == str:
|
| |
+ self.updated_at = datetime.fromisoformat(self.updated_at)
|
| |
|
| |
|
| |
class ReportFrequency(Enum):
|
| |
@@ -30,33 +30,53 @@
|
| |
|
| |
class Dataset(abc.ABC):
|
| |
def __init__(self) -> None:
|
| |
- self.source = ""
|
| |
- self._meta = DatasetMeta()
|
| |
- self.dataframe = pd.DataFrame()
|
| |
+ self._source = ""
|
| |
+ self._meta = DatasetMeta(filepath="", updated_at=datetime.min)
|
| |
+ self._data = pd.DataFrame()
|
| |
|
| |
- def load(self, source: str) -> None:
|
| |
- self.source = source
|
| |
- meta_updated, meta = self._get_meta()
|
| |
+ def __str__(self) -> str:
|
| |
+ return f"Last updated: {self.last_updated}"
|
| |
|
| |
- if not meta_updated:
|
| |
- raise ConnectionError("Unable to identify remote datasource.")
|
| |
+ @property
|
| |
+ def id(self) -> str:
|
| |
+ raise NotImplementedError
|
| |
|
| |
- self._meta = meta
|
| |
- self.dataframe = self._get_dataframe()
|
| |
+ @property
|
| |
+ def title(self) -> str:
|
| |
+ raise NotImplementedError
|
| |
+
|
| |
+ @property
|
| |
+ def data(self) -> pd.DataFrame:
|
| |
+ return self._data
|
| |
|
| |
@property
|
| |
- def last_updated(self) -> datetime.datetime:
|
| |
+ def x(self) -> Union[pd.Series, str]:
|
| |
+ raise NotImplementedError
|
| |
+
|
| |
+ @property
|
| |
+ def y(self) -> Union[pd.Series, str]:
|
| |
+ raise NotImplementedError
|
| |
+
|
| |
+ @property
|
| |
+ def last_updated(self) -> datetime:
|
| |
return self._meta.updated_at
|
| |
|
| |
@abc.abstractmethod
|
| |
- def _prepare_dataframe(self, dataframe: pd.DataFrame) -> pd.DataFrame:
|
| |
+ def _prepare_data(self, dataframe: pd.DataFrame) -> pd.DataFrame:
|
| |
# It should be implemented by the inheritance classes
|
| |
pass
|
| |
|
| |
+ def _get_data(self) -> pd.DataFrame:
|
| |
+ if self._meta.filepath.endswith(".csv"):
|
| |
+ dataframe = pd.read_csv(self._meta.filepath)
|
| |
+ return self._prepare_data(dataframe)
|
| |
+
|
| |
+ raise InvalidDatasetFormat()
|
| |
+
|
| |
def _get_meta(self) -> Tuple[bool, DatasetMeta]:
|
| |
updated_status, meta = False, self._meta
|
| |
try:
|
| |
- response = requests.get(self.source)
|
| |
+ response = requests.get(self._source)
|
| |
# TODO: logging the response
|
| |
if response.status_code == 201:
|
| |
updated_status, meta = True, DatasetMeta(**response.json())
|
| |
@@ -66,18 +86,22 @@
|
| |
|
| |
return updated_status, meta
|
| |
|
| |
- def _get_dataframe(self) -> pd.DataFrame:
|
| |
- if self._meta.filepath.endswith(".csv"):
|
| |
- dataframe = pd.read_csv(self._meta.filepath)
|
| |
- return self._prepare_dataframe(dataframe)
|
| |
+ def load(self, source: str) -> None:
|
| |
+ self._source = source
|
| |
+ meta_updated, meta = self._get_meta()
|
| |
|
| |
- raise InvalidDatasetFormat()
|
| |
+ if not meta_updated:
|
| |
+ raise ConnectionError("Unable to identify remote datasource.")
|
| |
+
|
| |
+ self._meta = meta
|
| |
+ self._data = self._get_data()
|
| |
|
| |
def update(self):
|
| |
+ # TODO: Add logging to check when there is an update request
|
| |
meta_updated, meta = self._get_meta()
|
| |
if meta_updated:
|
| |
self._meta = meta
|
| |
- self.dataframe = self._get_dataframe()
|
| |
+ self._data = self._get_data()
|
| |
|
| |
|
| |
class ActiveContributors(Dataset):
|
| |
@@ -85,11 +109,27 @@
|
| |
super().__init__()
|
| |
self.report_frequency = ReportFrequency.WEEKLY.value
|
| |
|
| |
+ @property
|
| |
+ def id(self) -> str:
|
| |
+ return "active-contributors"
|
| |
+
|
| |
+ @property
|
| |
+ def title(self) -> str:
|
| |
+ return "Active Contributors"
|
| |
+
|
| |
+ @property
|
| |
+ def x(self) -> Union[pd.Index, pd.Series, str]:
|
| |
+ return self._data.index
|
| |
+
|
| |
+ @property
|
| |
+ def y(self) -> Union[pd.Series, str]:
|
| |
+ return "ActiveUsers"
|
| |
+
|
| |
def load(self, base_url: str) -> None:
|
| |
endpoint = urljoin(base_url, "reports/contributors")
|
| |
super(ActiveContributors, self).load(endpoint)
|
| |
|
| |
- def _prepare_dataframe(self, dataframe: pd.DataFrame) -> pd.DataFrame:
|
| |
+ def _prepare_data(self, dataframe: pd.DataFrame) -> pd.DataFrame:
|
| |
dataframe.Date = pd.to_datetime(dataframe.Date, format="%m/%d/%y")
|
| |
return dataframe.resample(self.report_frequency, on="Date").sum()
|
| |
|
| |
With this function is available get a callback, it is in charge of making a request and update of the graph every day (in production) and every minute in the dev environment.