Stundenaufzeichnung/scripts/populate.py

89 lines
3.1 KiB
Python

import os.path
import sys
from argparse import ArgumentError
import sqlite3
def get_records(records):
extracted = []
with open(records, 'r', encoding='utf-8') as file:
for line in file:
line = line.strip('\n')
values = line.split(',', 3)
values[0] = values[0][8:] + "." + values[0][5:7] + "." + values[0][0:4]
values[3] = values[3].strip('"')
extracted.append(tuple(values))
return extracted
def get_estimates(estimates):
extracted = []
with open(estimates, 'r', encoding='utf-8') as file:
for line in file:
line = line.strip('\n')
values = line.split(',', 1)
extracted.append(tuple(values))
expanded = [extracted[0]]
for i, row in enumerate(extracted[1:]):
next_month = f'{int(expanded[-1][0][5:7]) + 1:02d}' if int(expanded[-1][0][5:7]) < 12 else "01"
next_date = f'{expanded[-1][0][:4] if int(expanded[-1][0][5:7]) < 12 else f'{int(expanded[-1][0][:4]) + 1:04d}'}-{next_month}'
while(next_date != row[0]):
expanded.append((next_date, None))
next_month = f'{int(expanded[-1][0][5:7]) + 1:02d}' if int(expanded[-1][0][5:7]) < 12 else "01"
next_date = f'{expanded[-1][0][:4] if int(expanded[-1][0][5:7]) < 12 else f'{int(expanded[-1][0][:4]) + 1:04d}'}-{next_month}'
expanded.append(row)
estimates = list(zip([int(r[0][:4]) for r in expanded][::3], [int(r[0][5:])//3 + 1 for r in expanded][::3], [r[1] for r in expanded][::3], [r[1] for r in expanded][1::3], [r[1] for r in expanded][2::3]))
estimates = list(filter(lambda e: not (e[2] == '' and e[3] == '' and e[4] == ''), estimates))
return estimates
def main():
if len(sys.argv) != 4:
raise ArgumentError(argument=None, message='Usage: python3 populate.py <userdb> <records> <estimates>')
userdb = sys.argv[1]
records_file = sys.argv[2]
estimates_file = sys.argv[3]
if not os.path.exists(userdb):
raise ArgumentError(argument=None, message='userdb must exist')
if not os.path.exists(records_file):
raise ArgumentError(argument=None, message=f'records file does not exist: {records_file}')
if not os.path.exists(estimates_file):
raise ArgumentError(argument=None, message=f'estimates file does not exist: {estimates_file}')
print("reading data...")
records = get_records(records_file)
estimates = get_estimates(estimates_file)
db = sqlite3.connect(userdb)
db_cursor = db.cursor()
print("inserting records...")
db_cursor.executemany("INSERT INTO records(date, start, end, comment) VALUES (?,?,?,?)", records)
print("inserting estimates...")
db_cursor.executemany("INSERT INTO estimates(year, quarter, estimate_0, estimate_1, estimate_2) VALUES (?,?,?,?,?)", estimates)
print("committing...")
db.commit()
db_cursor.close()
db.close()
print("Records and estimates populated")
if __name__ == '__main__':
main()