+from datetime import date, datetime
from decimal import (
Decimal,
localcontext,
from beancount.core import amount
from beancount.core.data import (
Entries,
+ new_metadata,
Posting,
Transaction,
)
+from beancount.loader import LoadError
from dateutil import rrule
import recurrent
+from typing import List, Optional, Union
+
__plugins__ = ('recurring',)
+def generate_rrule_from_string(phrase: str, start_date: Optional[Union[date, datetime]] = None):
+ r = recurrent.RecurringEvent(now_date=start_date)
+ recurr_val = phrase
+ r.parse(recurr_val)
+ if r.dtstart is None and start_date is not None:
+ r.dtstart = start_date
+ return rrule.rrulestr(r.get_RFC_rrule())
+
+
+def handle_recurring_transaction(txn: Transaction) -> Union[List[Transaction], LoadError]:
+ post_vals = []
+ for post in txn.postings:
+ post_vals.append([post.units, post.cost])
+ amortize = 'amortize' in txn.meta
+
+ rr = generate_rrule_from_string(txn.meta['recurring'], txn.date)
+ dates = [dt for dt in rr]
+
+ # setup data_copy
+ txn_dict = txn._asdict()
+ del txn_dict["date"]
+ del txn_dict["postings"]
+
+ tolerances = txn.meta['__tolerances__']
+ entries = []
+
+ for i, dt in enumerate(dates):
+ new_txn = Transaction(date=dt.date(), postings=[], **txn_dict)
+ if amortize:
+ for j, posting in enumerate(txn.postings):
+ post_dict = posting._asdict()
+ for amt, key in ((post_vals[j][0], 'units'), (post_vals[j][1], 'cost')):
+ if post_dict[key] is None:
+ continue
+
+ with localcontext() as ctx:
+ ctx.prec = len(str(tolerances[amt.currency])) - 2
+ amortized_amount = amount.div(amt, Decimal(len(dates)-i))
+ post_dict[key] = amortized_amount
+ post_vals[j][0] = amount.sub(amt, amortized_amount)
+ new_txn.postings.append(Posting(**post_dict))
+ else:
+ for posting in txn.postings:
+ new_txn.postings.append(posting)
+ entries.append(new_txn)
+
+ # replace metadata with computed values to show it was processed
+ # this dict is shared among all entries..
+ txn.meta['rphrase'] = txn.meta['recurring']
+ del txn.meta['recurring']
+ txn.meta['rstart'] = txn.date
+ txn.meta['rcount'] = len(dates)
+
+ return entries
+
+
def recurring(entries: Entries, options_map, config_string=""):
del options_map, config_string # unused
out_entries = []
+ errors = []
for entry in entries:
if type(entry) is not Transaction \
- or 'recurring' not in entry[0]:
+ or 'recurring' not in entry.meta:
out_entries.append(entry)
else:
- tdict = entry._asdict()
- del tdict["date"]
- del tdict["postings"]
- r = recurrent.RecurringEvent(now_date=entry[1])
- recurr_val = entry[0]['recurring']
- r.parse(recurr_val)
- r.dtstart = entry[1]
- rr = rrule.rrulestr(r.get_RFC_rrule())
-
- post_vals = []
- for post in entry.postings:
- print(type(post.units))
- post_vals.append([post.units, post.cost])
- print(post_vals)
- print(tdict)
- amortize = 'amortize' in entry[0]
-
- dates = [el for el in rr]
-
- for i, dt in enumerate(dates):
- txn = Transaction(date=dt.date(), postings=[], **tdict)
- if amortize:
- for j, posting in enumerate(entry.postings):
- remaining_units, remaining_cost = post_vals[j]
- post_dict = posting._asdict()
- #del post_dict['cost']
- del post_dict['units']
- print(type(remaining_units))
- with localcontext() as ctx:
- ctx.prec = len(str(txn.meta['__tolerances__'][remaining_units.currency]))-2
- units = amount.div(remaining_units, Decimal(len(dates)-i))
- print(units, type(units), units._asdict())
- post_vals[j][0] = amount.sub(remaining_units, units)
- print(post_vals[j][0])
- txn.postings.append(Posting(units=units, **post_dict))
+ try:
+ res = handle_recurring_transaction(entry)
+ if type(res) is list:
+ out_entries.extend(res)
else:
- for posting in entry.postings:
- txn.postings.append(posting)
- out_entries.append(txn)
-
- # replace metadata with computed values to show it was processed
- entry.meta['bbb-phrase'] = entry.meta['recurring']
- del entry.meta['recurring']
- entry.meta['bbb-start'] = entry[1]
+ errors.append(res)
+ except Exception as e:
+ errors.append(LoadError(
+ source=new_metadata(entry.meta["filename"], entry.meta["lineno"]),
+ message="Failed to handle recurring transaction: {}".format(e),
+ entry=entry))
- return out_entries, []
+ return out_entries, errors