89 lines
3.1 KiB
Python
89 lines
3.1 KiB
Python
import os.path
|
|
import sys
|
|
from argparse import ArgumentError
|
|
import sqlite3
|
|
|
|
|
|
def get_records(records):
|
|
|
|
extracted = []
|
|
|
|
with open(records, 'r', encoding='utf-8') as file:
|
|
for line in file:
|
|
line = line.strip('\n')
|
|
values = line.split(',', 3)
|
|
values[0] = values[0][8:] + "." + values[0][5:7] + "." + values[0][0:4]
|
|
values[3] = values[3].strip('"')
|
|
extracted.append(tuple(values))
|
|
|
|
return extracted
|
|
|
|
|
|
def get_estimates(estimates):
|
|
|
|
extracted = []
|
|
|
|
with open(estimates, 'r', encoding='utf-8') as file:
|
|
for line in file:
|
|
line = line.strip('\n')
|
|
values = line.split(',', 1)
|
|
extracted.append(tuple(values))
|
|
|
|
expanded = [extracted[0]]
|
|
for i, row in enumerate(extracted[1:]):
|
|
next_month = f'{int(expanded[-1][0][5:7]) + 1:02d}' if int(expanded[-1][0][5:7]) < 12 else "01"
|
|
next_date = f'{expanded[-1][0][:4] if int(expanded[-1][0][5:7]) < 12 else f'{int(expanded[-1][0][:4]) + 1:04d}'}-{next_month}'
|
|
|
|
while(next_date != row[0]):
|
|
expanded.append((next_date, None))
|
|
next_month = f'{int(expanded[-1][0][5:7]) + 1:02d}' if int(expanded[-1][0][5:7]) < 12 else "01"
|
|
next_date = f'{expanded[-1][0][:4] if int(expanded[-1][0][5:7]) < 12 else f'{int(expanded[-1][0][:4]) + 1:04d}'}-{next_month}'
|
|
|
|
expanded.append(row)
|
|
|
|
estimates = list(zip([int(r[0][:4]) for r in expanded][::3], [int(r[0][5:])//3 + 1 for r in expanded][::3], [r[1] for r in expanded][::3], [r[1] for r in expanded][1::3], [r[1] for r in expanded][2::3]))
|
|
estimates = list(filter(lambda e: not (e[2] == '' and e[3] == '' and e[4] == ''), estimates))
|
|
return estimates
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) != 4:
|
|
raise ArgumentError(argument=None, message='Usage: python3 populate.py <userdb> <records> <estimates>')
|
|
|
|
userdb = sys.argv[1]
|
|
records_file = sys.argv[2]
|
|
estimates_file = sys.argv[3]
|
|
|
|
if not os.path.exists(userdb):
|
|
raise ArgumentError(argument=None, message='userdb must exist')
|
|
|
|
if not os.path.exists(records_file):
|
|
raise ArgumentError(argument=None, message=f'records file does not exist: {records_file}')
|
|
|
|
if not os.path.exists(estimates_file):
|
|
raise ArgumentError(argument=None, message=f'estimates file does not exist: {estimates_file}')
|
|
|
|
print("reading data...")
|
|
|
|
records = get_records(records_file)
|
|
estimates = get_estimates(estimates_file)
|
|
|
|
db = sqlite3.connect(userdb)
|
|
db_cursor = db.cursor()
|
|
|
|
print("inserting records...")
|
|
db_cursor.executemany("INSERT INTO records(date, start, end, comment) VALUES (?,?,?,?)", records)
|
|
print("inserting estimates...")
|
|
db_cursor.executemany("INSERT INTO estimates(year, quarter, estimate_0, estimate_1, estimate_2) VALUES (?,?,?,?,?)", estimates)
|
|
|
|
print("committing...")
|
|
db.commit()
|
|
db_cursor.close()
|
|
db.close()
|
|
|
|
print("Records and estimates populated")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|