]> git.walde.dev - beanbeanbean/commitdiff
Clean up recurring code
authorDustin Walde <redacted>
Tue, 7 Nov 2023 21:13:30 +0000 (13:13 -0800)
committerDustin Walde <redacted>
Tue, 7 Nov 2023 21:13:30 +0000 (13:13 -0800)
- Add generic error message if one fails
- Clean up amortize decimal math

.gitignore [new file with mode: 0644]
src/beanbeanbean/recurring.py

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..849ddff
--- /dev/null
@@ -0,0 +1 @@
+dist/
index 139065d01c9b60b74374512828e9a3b86b16c74c..9693bb9b78269a10524d0f8b9beb6e1e7397f2f7 100644 (file)
@@ -1,3 +1,4 @@
+from datetime import date, datetime
 from decimal import (
     Decimal,
     localcontext,
@@ -6,67 +7,95 @@ from decimal import (
 from beancount.core import amount
 from beancount.core.data import (
     Entries,
+    new_metadata,
     Posting,
     Transaction,
 )
+from beancount.loader import LoadError
 from dateutil import rrule
 import recurrent
 
+from typing import List, Optional, Union
+
 __plugins__ = ('recurring',)
 
 
+def generate_rrule_from_string(phrase: str, start_date: Optional[Union[date, datetime]] = None):
+    r = recurrent.RecurringEvent(now_date=start_date)
+    recurr_val = phrase
+    r.parse(recurr_val)
+    if r.dtstart is None and start_date is not None:
+        r.dtstart = start_date
+    return rrule.rrulestr(r.get_RFC_rrule())
+
+
+def handle_recurring_transaction(txn: Transaction) -> Union[List[Transaction], LoadError]:
+    post_vals = []
+    for post in txn.postings:
+        post_vals.append([post.units, post.cost])
+    amortize = 'amortize' in txn.meta
+
+    rr = generate_rrule_from_string(txn.meta['recurring'], txn.date)
+    dates = [dt for dt in rr]
+
+    # setup data_copy
+    txn_dict = txn._asdict()
+    del txn_dict["date"]
+    del txn_dict["postings"]
+
+    tolerances = txn.meta['__tolerances__']
+    entries = []
+
+    for i, dt in enumerate(dates):
+        new_txn = Transaction(date=dt.date(), postings=[], **txn_dict)
+        if amortize:
+            for j, posting in enumerate(txn.postings):
+                post_dict = posting._asdict()
+                for amt, key in ((post_vals[j][0], 'units'), (post_vals[j][1], 'cost')):
+                    if post_dict[key] is None:
+                        continue
+
+                    with localcontext() as ctx:
+                        ctx.prec = len(str(tolerances[amt.currency])) - 2
+                        amortized_amount = amount.div(amt, Decimal(len(dates)-i))
+                        post_dict[key] = amortized_amount
+                    post_vals[j][0] = amount.sub(amt, amortized_amount)
+                new_txn.postings.append(Posting(**post_dict))
+        else:
+            for posting in txn.postings:
+                new_txn.postings.append(posting)
+        entries.append(new_txn)
+
+    # replace metadata with computed values to show it was processed
+    # this dict is shared among all entries..
+    txn.meta['rphrase'] = txn.meta['recurring']
+    del txn.meta['recurring']
+    txn.meta['rstart'] = txn.date
+    txn.meta['rcount'] = len(dates)
+
+    return entries
+
+
 def recurring(entries: Entries, options_map, config_string=""):
     del options_map, config_string # unused
     out_entries = []
+    errors = []
     for entry in entries:
         if type(entry) is not Transaction \
-                or 'recurring' not in entry[0]:
+                or 'recurring' not in entry.meta:
             out_entries.append(entry)
         else:
-            tdict = entry._asdict()
-            del tdict["date"]
-            del tdict["postings"]
-            r = recurrent.RecurringEvent(now_date=entry[1])
-            recurr_val = entry[0]['recurring']
-            r.parse(recurr_val)
-            r.dtstart = entry[1]
-            rr = rrule.rrulestr(r.get_RFC_rrule())
-
-            post_vals = []
-            for post in entry.postings:
-                print(type(post.units))
-                post_vals.append([post.units, post.cost])
-            print(post_vals)
-            print(tdict)
-            amortize = 'amortize' in entry[0]
-
-            dates = [el for el in rr]
-
-            for i, dt in enumerate(dates):
-                txn = Transaction(date=dt.date(), postings=[], **tdict)
-                if amortize:
-                    for j, posting in enumerate(entry.postings):
-                        remaining_units, remaining_cost = post_vals[j]
-                        post_dict = posting._asdict()
-                        #del post_dict['cost']
-                        del post_dict['units']
-                        print(type(remaining_units))
-                        with localcontext() as ctx:
-                            ctx.prec = len(str(txn.meta['__tolerances__'][remaining_units.currency]))-2
-                            units = amount.div(remaining_units, Decimal(len(dates)-i))
-                        print(units, type(units), units._asdict())
-                        post_vals[j][0] = amount.sub(remaining_units, units)
-                        print(post_vals[j][0])
-                        txn.postings.append(Posting(units=units, **post_dict))
+            try:
+                res = handle_recurring_transaction(entry)
+                if type(res) is list:
+                    out_entries.extend(res)
                 else:
-                    for posting in entry.postings:
-                        txn.postings.append(posting)
-                out_entries.append(txn)
-
-            # replace metadata with computed values to show it was processed
-            entry.meta['bbb-phrase'] = entry.meta['recurring']
-            del entry.meta['recurring']
-            entry.meta['bbb-start'] = entry[1]
+                    errors.append(res)
+            except Exception as e:
+                errors.append(LoadError(
+                    source=new_metadata(entry.meta["filename"], entry.meta["lineno"]),
+                    message="Failed to handle recurring transaction: {}".format(e),
+                    entry=entry))
 
-    return out_entries, []
+    return out_entries, errors