]> git.walde.dev - punch/commitdiff
Add encompassing TimeSheet class to tie data state
authorDustin Walde <redacted>
Tue, 25 Apr 2023 23:01:26 +0000 (16:01 -0700)
committerDustin Walde <redacted>
Tue, 25 Apr 2023 23:01:26 +0000 (16:01 -0700)
src/time_sheet.py [new file with mode: 0644]

diff --git a/src/time_sheet.py b/src/time_sheet.py
new file mode 100644 (file)
index 0000000..571578f
--- /dev/null
@@ -0,0 +1,125 @@
+# ©2023 Dustin Walde — GPL-3.0-or-later
+
+import csv
+from datetime import datetime
+from io import TextIOWrapper
+import logging
+
+from typing import Dict, List, Optional, Union
+
+from category import Category
+from entry import Entry
+
+logger = logging.getLogger(__name__)
+
+
+def parsedate(date_str: str) -> datetime:
+    return datetime.fromisoformat(date_str)
+
+class TimeSheet:
+
+    def __init__(self) -> None:
+        self.root = Category(
+                abbr = '',
+                name = 'Root',
+                parent = None,
+                children = [],
+                )
+        self.categories = {"": self.root}
+        self.entries = []
+        self.cat_entries = {}
+
+
+    def load_csv(self, filepath: str) -> None:
+        try:
+            with open(filepath, encoding='utf-8') as file:
+                self._load_file(file)
+        except FileNotFoundError:
+            logger.error(f"Could not load file: {filepath}")
+        logger.debug(self.categories)
+        logger.debug(self.entries)
+
+
+    # private helpers
+
+
+    def _load_category_hierarchy(self, category_rows: List[List[str]]) -> None:
+        self.categories: Dict[str, Category] = { "": self.root }
+        self.root.children = []
+        for row in category_rows:
+            if row[1] == 'category':
+                self.categories[row[0]] = Category(
+                    abbr = row[0],
+                    name = row[2],
+                    )
+            elif row[1] == 'group':
+                self.categories[row[2]] = Category(
+                    abbr = row[2],
+                    name = row[3],
+                    children = [],
+                    )
+        for row in category_rows:
+            parent = ""
+            if row[1] == 'category' and len(row) > 3:
+                parent = row[3]
+                abbr = row[0]
+            elif row[1] == 'group' and len(row) > 4:
+                parent = row[4]
+                abbr = row[2]
+            else:
+                continue
+            if parent in self.categories:
+                parent = self.categories[parent]
+                current = self.categories[abbr]
+                if parent.children is None:
+                    logger.warning(f"Parent {parent.abbr} is not a group.")
+                else:
+                    parent.children.append(current)
+                    current.parent = parent
+        self.root = self.categories['']
+
+
+    def _load_file(self, file: TextIOWrapper) -> None:
+        categories = []
+        cat_to_times = {}
+        reader = csv.reader(file)
+        for row in reader:
+            if len(row) < 2:
+                logger.warning(f'Bad csv entry: {row}')
+                continue
+            type = row[1]
+            if type == 'category' or type == 'group':
+                categories.append(row)
+            else:
+                abbr = row[0]
+                if abbr not in cat_to_times:
+                    cat_to_times[abbr] = []
+                cat_to_times[abbr].append(row)
+
+        self._load_category_hierarchy(categories)
+        self._load_times(cat_to_times)
+
+
+    def _load_times(self, cat_to_times: Dict[str, List[str]]) -> None:
+        self.entries = []
+        self.cat_entries = {}
+        for cat, times in cat_to_times.items():
+            self.cat_entries[cat] = []
+            times.sort(key=lambda e: parsedate(e[2]))
+
+            current_entry: Optional[Entry] = None
+            for time in times:
+                if current_entry is None and time[1] == 'in':
+                    index = len(self.entries)
+                    current_entry = Entry(cat, parsedate(time[2]))
+                    self.entries.append(current_entry)
+                    self.cat_entries[cat].append(index)
+                elif current_entry is not None and time[1] == 'out':
+                    current_entry.t_out = parsedate(time[2])
+                    current_entry = None
+                else:
+                    logger.error(f"Time entry out of order for category {cat}")
+                    logger.error(current_entry)
+                    logger.error(time)
+                    break
+