Source code for larigira.timegen
"""
main module to read and get informations about alarms
"""
import sys
from datetime import datetime
import argparse
import json
from .entrypoints_utils import get_one_entrypoint
from logging import getLogger
log = getLogger("timegen")
[docs]
def get_timegenerator(kind):
"""Messes with entrypoints to return an timegenerator function"""
return get_one_entrypoint("larigira.timegenerators", kind)
[docs]
def get_parser():
parser = argparse.ArgumentParser(
description='Generate "ring times" from a timespec'
)
parser.add_argument(
"timespec",
metavar="TIMESPEC",
type=str,
nargs=1,
help="filename for timespec, formatted in json",
)
parser.add_argument(
"--now",
metavar="NOW",
type=int,
nargs=1,
default=None,
help='Set a different "time", in unix epoch',
)
parser.add_argument(
"--howmany",
metavar="N",
type=int,
nargs=1,
default=[1],
help='Set a different "time", in unix epoch',
)
return parser
[docs]
def read_spec(fname):
try:
if fname == "-":
return json.load(sys.stdin)
with open(fname) as buf:
return json.load(buf)
except ValueError:
sys.stderr.write("Error: invalid JSON\n")
sys.exit(1)
[docs]
def check_spec(spec):
if "kind" not in spec:
yield "Missing field 'kind'"
[docs]
def timegenerate(spec, now=None, howmany=1):
Alarm = get_timegenerator(spec["kind"])
generator = Alarm(spec)
if now is not None:
if type(now) is not datetime:
now = datetime.fromtimestamp(now)
for _ in range(howmany):
now = generator.next_ring(current_time=now)
yield now
[docs]
def main():
"""Main function for the "larigira-timegen" executable"""
args = get_parser().parse_args()
spec = read_spec(args.timespec[0])
errors = tuple(check_spec(spec))
if errors:
log.error("Errors in timespec")
for err in errors:
sys.stderr.write("Error: {}\n".format(err))
sys.exit(1)
now = None if args.now is None else args.now.pop()
howmany = None if args.howmany is None else args.howmany.pop()
for time in timegenerate(spec, now=now, howmany=howmany):
print(time)