From: Dustin Walde Date: Tue, 25 Apr 2023 23:01:26 +0000 (-0700) Subject: Add encompassing TimeSheet class to tie data state X-Git-Url: https://git.walde.dev/?a=commitdiff_plain;h=87a253dc721a9a12acd14b5c4b83288bc6d158ce;p=punch Add encompassing TimeSheet class to tie data state --- diff --git a/src/time_sheet.py b/src/time_sheet.py new file mode 100644 index 0000000..571578f --- /dev/null +++ b/src/time_sheet.py @@ -0,0 +1,125 @@ +# ©2023 Dustin Walde — GPL-3.0-or-later + +import csv +from datetime import datetime +from io import TextIOWrapper +import logging + +from typing import Dict, List, Optional, Union + +from category import Category +from entry import Entry + +logger = logging.getLogger(__name__) + + +def parsedate(date_str: str) -> datetime: + return datetime.fromisoformat(date_str) + +class TimeSheet: + + def __init__(self) -> None: + self.root = Category( + abbr = '', + name = 'Root', + parent = None, + children = [], + ) + self.categories = {"": self.root} + self.entries = [] + self.cat_entries = {} + + + def load_csv(self, filepath: str) -> None: + try: + with open(filepath, encoding='utf-8') as file: + self._load_file(file) + except FileNotFoundError: + logger.error(f"Could not load file: {filepath}") + logger.debug(self.categories) + logger.debug(self.entries) + + + # private helpers + + + def _load_category_hierarchy(self, category_rows: List[List[str]]) -> None: + self.categories: Dict[str, Category] = { "": self.root } + self.root.children = [] + for row in category_rows: + if row[1] == 'category': + self.categories[row[0]] = Category( + abbr = row[0], + name = row[2], + ) + elif row[1] == 'group': + self.categories[row[2]] = Category( + abbr = row[2], + name = row[3], + children = [], + ) + for row in category_rows: + parent = "" + if row[1] == 'category' and len(row) > 3: + parent = row[3] + abbr = row[0] + elif row[1] == 'group' and len(row) > 4: + parent = row[4] + abbr = row[2] + else: + continue + if parent in self.categories: + parent = self.categories[parent] + current = self.categories[abbr] + if parent.children is None: + logger.warning(f"Parent {parent.abbr} is not a group.") + else: + parent.children.append(current) + current.parent = parent + self.root = self.categories[''] + + + def _load_file(self, file: TextIOWrapper) -> None: + categories = [] + cat_to_times = {} + reader = csv.reader(file) + for row in reader: + if len(row) < 2: + logger.warning(f'Bad csv entry: {row}') + continue + type = row[1] + if type == 'category' or type == 'group': + categories.append(row) + else: + abbr = row[0] + if abbr not in cat_to_times: + cat_to_times[abbr] = [] + cat_to_times[abbr].append(row) + + self._load_category_hierarchy(categories) + self._load_times(cat_to_times) + + + def _load_times(self, cat_to_times: Dict[str, List[str]]) -> None: + self.entries = [] + self.cat_entries = {} + for cat, times in cat_to_times.items(): + self.cat_entries[cat] = [] + times.sort(key=lambda e: parsedate(e[2])) + + current_entry: Optional[Entry] = None + for time in times: + if current_entry is None and time[1] == 'in': + index = len(self.entries) + current_entry = Entry(cat, parsedate(time[2])) + self.entries.append(current_entry) + self.cat_entries[cat].append(index) + elif current_entry is not None and time[1] == 'out': + current_entry.t_out = parsedate(time[2]) + current_entry = None + else: + logger.error(f"Time entry out of order for category {cat}") + logger.error(current_entry) + logger.error(time) + break +