GitLab will be upgraded on 30 Jan 2023 from 2.00 pm (AEDT) to 3.00 pm (AEDT). During the update, GitLab and Mattermost services will not be available. If you have any concerns with this, please talk to us at N110 (b) CSIT building.

Commit c4937d0a authored by Lincoln Smith's avatar Lincoln Smith
Browse files

Removed scripts moved to different repo.

parent 67420cb6
"""
Basic script for grabbing the details of courses from the P&C website and popping them into CIMS
via the API.
"""
# Byron Vickers May 2017
# Lincoln Smith 2018
import json
import logging
import sys
from datetime import date, datetime
import requests
from cims.contrib.pandcscraper.pandc_scraper import PandCScraper
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger()
COURSE_TEMPLATE = {
"code": None,
}
VERSION_TEMPLATE = {
"course": None,
"version_descriptor": "Initial version -- scraped from P&C",
"status": None,
"status_freetext": "",
"approval_date": date.today().isoformat(),
"approval_desc": "Scraped from Programs and Courses by automated script",
"title": None,
"description": None,
"delivery_mode": None,
'assumed_knowledge': '',
"prerequisites_courses": None,
"corequisite_courses": "",
"incompatible_courses": "",
"research_led_teaching": "",
"required_resources": "",
"additional_costs": "",
"participation": "",
"field_trips": "",
"recommende_resources": "",
"other_information": "",
}
INSTANCE_TEMPLATE = {
"courseversion": None,
"instance_descriptor": None,
"start_date": None,
"end_date": None,
"class_number": None,
"teachingsession": None,
"delivery_mode": None,
"inherent_requirements": "",
"workload": "",
"prescribed_texts": "",
"preliminary_reading_list": "",
"indicative_reading_list": "",
"staff_feedback": "",
"course_url": "",
"exams": "",
"exam_material": "",
"online_submission": "The ANU uses Turnitin to enhance student citation and referencing "
"techniques, and to assess assignment submissions as a component of the "
"University's approach to managing Academic Integrity. While the use of "
"Turnitin is not mandatory, the ANU highly recommends Turnitin is used "
"by both teaching staff and students. For additional information "
"regarding Turnitin please visit the ANU Online website.",
"late_submission_policy": "",
"resubmission_policy": "",
"assignment_return": "",
"reference_requirements": "",
"hard_copy_submission": "",
"other_information": "",
"taught_by": None,
}
def main(args, pword):
session = requests.Session()
if args.uname:
session.auth = (args.uname, pword)
# get API endpoints the "right" way
# We're going to be very blase about request error handling for read queries,
# but we'll need to be more careful later when we're actually writing to the CIMS
endpoints_resp = session.get(args.api_root)
endpoints_resp.raise_for_status()
endpoints = endpoints_resp.json() # type: dict
logging.debug(endpoints)
# If we're meant to be operating over https make sure the API is honoring it (Gunicorn has
# quietly broken this in the past).
if args.api_root.startswith('https://'):
for api_url in endpoints.values():
if not api_url.startswith('https://'):
raise Exception("API isn't returning https urls.")
# get the "static" items we need (lists of course modes, teaching sessions, etc.)
def get_list(modelname):
return requests.get(endpoints[str(modelname)], params={'limit': '10000'}).json()['results']
teachingsessions = get_list("teachingsession")
deliverymodes = get_list("deliverymode")
ous = get_list("organisationalunit")
rscs = next((ou for ou in ous if ou['short_name'] == 'RSCS'))['id']
rseng = next((ou for ou in ous if ou['short_name'] == 'RSEng'))['id']
statuses = get_list("reviewstatus")
approved_nonauth = [
status for status in statuses if status["name"] == "Approved - Not Authoritative"
][0]
VERSION_TEMPLATE["status"] = approved_nonauth["id"]
courses_json = get_list("course")
cims_course_codes = set([c["code"] for c in courses_json])
args.verbose and logging.info("COURSELIST: {}".format(args.courses))
for code in args.courses:
# If we're initialising courses, check if it already exists in CIMS, and if not try to
# import it.
if args.initialise_course and code in cims_course_codes:
logging.info(
"Code {} is already in the cims course list, skipping "
"initialisation".format(code)
)
continue
logging.info("Importing course data for course code {}".format(code))
try:
scraper = PandCScraper(code)
args.verbose and logging.info("Imported P&C data successfully")
except requests.exceptions.HTTPError as e:
logging.error('Failed to scrape {}, skipping. Error was: {}'.format(code, str(e)))
continue
course_id = None
version_id = None
if args.initialise_course:
# Create a course
course_data = {
"code": code,
}
if args.dry_run:
logging.info(course_data)
else:
course_resp = session.post(endpoints["course"], json=course_data)
if not course_resp.ok:
logging.error("Could not create course for code {}. Erroring "
"out...".format(code))
logging.error("Submission was as follows: {}".format(course_data))
logging.error("Response was as follows: {}".format(course_resp.text))
raise Exception("Failed to create course {}".format(code))
args.verbose and logging.info(course_resp.json())
course_id = course_resp.json()["id"]
# Create a version
version_data = VERSION_TEMPLATE.copy()
try:
d_mode = next((m['id'] for m in deliverymodes if m['mode'] in scraper.get_mode()))
except StopIteration:
d_mode = None
version_data.update({
"course": course_id,
"title": scraper.get_title(),
"description": scraper.get_introduction(),
"delivery_mode": d_mode,
"prerequisites": scraper.get_req_and_incomp() or '',
"workload": scraper.get_workload() or '',
"prescribed_texts": scraper.get_prescribed_texts() or '',
"other_info": scraper.get_other_information() or ''
})
if args.dry_run:
logging.info(version_data)
else:
version_resp = session.post(endpoints["courseversion"], json=version_data)
if not version_resp.ok:
logging.error("Could not create version for code {}. Erroring "
"out...".format(code))
logging.error("Submission was as follows: {}".format(version_data))
logging.error("Response was as follows: {}".format(version_resp.text))
raise Exception("Failed to create version for course {}".format(code))
args.verbose and logging.info(version_resp.json())
version_id = version_resp.json()["id"]
# Learning Outcomes
lo_ids = []
for outcome in scraper.get_learning_outcomes():
lo_data = {
"courseversion": version_id,
"outcome": outcome,
}
if args.dry_run:
logging.info(lo_data)
else:
lo_resp = session.post(endpoints["learningoutcome"], json=lo_data)
if not lo_resp.ok:
logging.error("Could not create learning outcome for code {}. Erroring "
"out...".format(code))
logging.error("Submission was as follows: {}".format(json.dumps(lo_data)))
logging.error("Response was as follows: {}".format(lo_resp.text))
raise Exception("Failed to create learning outcome for CourseVersion "
"id:{}".format(version_id))
args.verbose and logging.info(lo_resp.json())
lo_ids.append(lo_resp.json()["id"])
def tidy_loop():
cims_course_codes.add(code)
logging.info('Succesfully updated/initialised {}'.format(code))
if not args.update_classes:
tidy_loop()
continue
# Get a courseversion if we didn't create one above
if args.update_classes and version_id is None:
courseversions = requests.get(endpoints['courseversion'],
params={'course__code': code}).json()['results']
try:
version_id = next(
(
cv['id'] for cv in courseversions if cv['status'] in
[
status['id'] for status in statuses if
status["name"].startswith('Approved')
]
)
)
except StopIteration:
logging.error("Couldn't find an approved CourseVersion for {}, "
"skipping update of classes".format(code))
continue
# Course Instances
def date_convert(datestr):
return datetime.strptime(datestr, "%d %b %Y").date().isoformat()
def semester_pk_lookup(ci):
start_date = date_convert(ci["Class start date"])
end_date = date_convert(ci["Class end date"])
pk_fname = "id"
id_opts = [
ts[pk_fname] for ts in teachingsessions
if ts['start_date'] == start_date
and ts['end_date'] == end_date
]
if id_opts:
return id_opts[0]
else:
return None
this_year = str(date.today().year)
if code.startswith('ENGN'):
taught_by = [rseng, ]
else:
taught_by = [rscs, ]
courseinstances = requests.get(endpoints['courseinstance'],
params={'course__code': code}).json()['results']
for year, semesters in scraper.get_offerings_all().items():
if not year == this_year:
continue
for semester_name, semester_classes in semesters.items():
for ci in semester_classes:
semester_pk = semester_pk_lookup(ci)
ci_data = INSTANCE_TEMPLATE.copy()
ci_data.update({
"courseversion": version_id,
"class_number": ci["Class number"],
"teachingsession": semester_pk,
"instance_descriptor": "{}, {} {}".format(code, semester_name, year),
"start_date": date_convert(ci["Class start date"]),
"end_date": date_convert(ci["Class end date"]),
"delivery_mode": [
m["id"] for m in deliverymodes
if m["mode"] == ci["Mode Of Delivery"]
][0],
'taught_by': taught_by,
})
if [instance for instance in courseinstances
if (ci_data['start_date'] == instance['start_date']
and ci_data['end_date'] == instance['end_date'])]:
logging.info('Existing class with start date {} and end date {} found. '
'Skipping'.format(ci_data['start_date'], ci_data['end_date']))
continue
if args.dry_run:
logging.info(ci_data)
else:
ci_resp = session.post(endpoints["courseinstance"], json=ci_data)
if not ci_resp.ok:
logging.error("Could not create course instance for code {}. Erroring "
"out...".format(code))
logging.error("Submission was as follows: {}".format(ci_data))
logging.error("Response was as follows: {}".format(ci_resp.text))
raise Exception(
"Failed to create course instance for course {}".format(code)
)
args.verbose and logging.info(ci_resp.json())
logging.info('Added class running: {} - {}.'.format(
ci_data['start_date'],
ci_data['end_date']
))
tidy_loop()
if __name__ == "__main__":
import argparse
import getpass
parser = argparse.ArgumentParser(
description='Inject data scraped from P&C into CIMS via its API.'
)
parser.add_argument('courses', type=str, default=[], nargs='*',
help='Course codes for courses we want to process.'
)
parser.add_argument('-a', '--api-root', type=str, default=None, required=True,
help='URL of the API root.'
)
parser.add_argument('-f', '--file', type=str, default=None,
help='Filename containing course codes to batch process.'
)
parser.add_argument('-u', '--uname', type=str, default=None,
help='Username to access API with.'
)
parser.add_argument('-p', '--password', type=str, default=None,
help='Password for API access. Script will prompt for a password if '
'a username but no password is given.'
)
parser.add_argument('-I', '--initialise-course', action='store_true',
help='If the course doesn\'t exist in CIMS, create it along with a '
'CourseVersion and CourseInstances for the current year using '
'information scraped from P&C.'
)
parser.add_argument('-U', '--update-classes', action='store_true',
help='Create classes for existing courses using teaching session '
'data from P&C.'
)
parser.add_argument('-l', '--logfile', type=str, default=None,
help='File to log output to.'
)
parser.add_argument('-d', '--dry-run', action='store_true',
help="Don't perform update, print what would have been sent."
)
parser.add_argument('-v', '--verbose', action='store_true',
help='Mostly dump details of created objects.'
)
args = parser.parse_args()
assert args.initialise_course or args.update_classes, (
'At least one of -I or -U required.'
)
args.logfile and logger.addHandler(logging.FileHandler(args.logfile))
if args.file:
codes = set(args.courses)
with open(args.file, 'r') as fp:
codes.update(set(fp.read().splitlines()))
args.courses = list(codes)
args.courses.sort()
assert args.courses, 'No course codes found in batch file or command line arguments.'
if args.uname and not args.password:
pword = getpass.getpass("Password: ")
else:
pword = args.password
main(args, pword)
#!/usr/bin/env python
# Lincoln Smith 2018
import logging
import smtplib
from email.headerregistry import Address
from email.message import EmailMessage
def main():
with open(args.recipients) as recipients:
lines = [line.split(',') for line in recipients]
address_pairs = [(line[0], line[1].split()[0]) for line in lines]
with smtplib.SMTP(args.smtp_server) as smtp_server:
for address, first_name in address_pairs:
msg = EmailMessage()
msg['Subject'] = args.subject
local_part, domain_part = args.sender_email.split('@')
msg['From'] = Address(args.sender_name, local_part, domain_part)
local_part, domain_part = address.split('@')
msg['To'] = Address(first_name, local_part, domain_part)
with open(args.message_body) as content_file:
msg.set_content(content_file.read().format(first_name))
with open(args.message_html) as content_file:
msg.add_alternative(content_file.read().format(first_name), subtype='html')
args.verbose and logger.debug(msg.as_string())
smtp_server.send_message(msg)
logger.info('Email sent to: {}'.format(address))
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description='Email a bunch of people.'
)
parser.add_argument('sender_name', type=str, help='The name of the sender.')
parser.add_argument('sender_email', type=str, help='The email address of the sender.')
parser.add_argument('subject', type=str, help='The email subject line.')
parser.add_argument('-r', '--recipients', type=str, default=None, required=True,
help='File name of file containing recipients. One "<email_addr>,<name>" '
'pair per line.'
)
parser.add_argument('-b', '--message-body', type=str, default=None, required=True,
help='File name of message text. It will be used as a python format string '
'with the recipient\'s first name.'
)
parser.add_argument('-m', '--message-html', type=str, default=None,
help='File name of HTML formatted message text. It wll be used as a '
'format string with the recipient\'s first name.'
)
parser.add_argument('-s', '--smtp-server', type=str, default='localhost', required=True,
help='Hostname of the SMTP server to use to send the mail'
)
parser.add_argument('-l', '--logfile', type=str, default=None, help='File to log output to.')
parser.add_argument('-v', '--verbose', action='store_true')
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
args.logfile and logger.addHandler(logging.FileHandler(args.logfile))
main()
# Lincoln Smith 2018
import json
import logging
from datetime import datetime
import requests
logging.basicConfig(level="INFO")
logger = logging.getLogger()
def set_conveners(program_args, session, course_code, usernames):
"""
Set the users identified by `usernames` as conveners for CourseInstances of `course_code`
running in `year`. Set for current year if no year given.
"""
datetime.strptime(program_args.start_date, '%Y-%m-%d')
year_start = program_args.start_date
datetime.strptime(program_args.end_date, '%Y-%m-%d')
year_end = program_args.end_date
# Get our CourseInstances that start between our start and end dates
response = requests.get(
'{}courseinstance/'.format(program_args.api_root),
params={'course__code': course_code, 'start_date_0': year_start,
'start_date_1': year_end}
)
try:
response.raise_for_status()
except requests.exceptions.HTTPError:
logger.info('Failed to get CourseInstances for course: {}'.format(course_code))
return
required_fields = ['id', 'instance_descriptor', 'courseversion', 'start_date', 'end_date',
'taught_by']
for courseinstance in response.json()['results']:
ci_update = {}
for fname in required_fields:
ci_update[fname] = courseinstance[fname]
user_ids = set(courseinstance['conveners'])
user_ids.update(set(get_user_ids(program_args, session, usernames)))
if user_ids:
ci_update['conveners'] = list(user_ids)
if program_args.dry_run:
logger.info(
'UPDATE: {}courseinstance/{}/ - {}'.format(
program_args.api_root, ci_update['id'], ci_update
)
)
continue
response = session.put(
'{}courseinstance/{}/'.format(program_args.api_root, ci_update['id']),
data=ci_update,
)
try:
response.raise_for_status()
logger.info(
'SUCCESS - set conveners for class: {}'.format(
courseinstance['instance_descriptor']
)
)
except requests.exceptions.HTTPError:
logger.info(
'Failed to set conveners for class: {}'.format(
courseinstance['instance_descriptor']
)
)
def get_user_ids(program_args, session, usernames):
"""
Look up each username via the API and add their database ID to a list of IDs. If they don't
exist in the database attempt to add them as an LDAP authenticating user.
"""
ids = []
for uname in usernames:
response = session.get('{}user/'.format(program_args.api_root), params={'username': uname})
try:
response.raise_for_status()
except requests.exceptions.HTTPError:
logger.info('Failed to get user: {}'.format(uname))
content = response.json()
# If the user doesn't exist in our database attempt to add them
if not content['count']:
if program_args.dry_run:
logger.info('Skipping ldap add of user: {}'.format(uname))
continue
response = session.post(
'{}user/set_ldap_user/'.format(program_args.api_root),
data={'username': uname}
)
try:
response.raise_for_status()
user = response.json()
ids.append(user['id'])
except requests.exceptions.HTTPError as e:
logger.info("Couldn't populate LDAP user: {}. Error was: {}".format(uname, e))
else:
ids.append(content['results'][0]['id'])
return ids
def main(program_args, password):
session = requests.Session()
if program_args.uname:
session.auth = (program_args.uname, password)
for course_code, usernames in program_args.courses.items():
# If there are convener usernames listed against the course process the course.
any(usernames) and set_conveners(program_args, session, course_code, usernames)
if __name__ == "__main__":
import argparse
import getpass
parser = argparse.ArgumentParser(
description='Set conveners for courses.'
)
parser.add_argument('-a', '--api-root', type=str, default=None, required=True,
help='URL of the API root.'
)
parser.add_argument('-s', '--start-date', type=str, default=None, required=True,
help='The beginning of the start date range in which to set course '
'conveners. Format yyyy-mm-dd.'
)
parser.add_argument('-e', '--end-date', type=str, default=None, required=True,
help='The end of the start date range in which to set course conveners. '
'Format yyyy-mm-dd.'
)
parser.add_argument('-f', '--file', type=str, default=None,
help='JSON formatted file of format: {<course_code>: [uniID,..], ..}'
)
parser.add_argument('-u', '--uname', type=str, default=None,
help='Username to access API with.'
)
parser.add_argument('-p', '--password', type=str, default=None,
help='Password for API access. Script will prompt for a password if '
'a username but no password is given.'
)
parser.add_argument('-l', '--logfile', type=str, default=None,
help='File to log output to.'
)
parser.add_argument('-d', '--dry-run', action='store_true',
help="Don't perform update, print what would have been sent."
)
parser.add_argument('-v', '--verbose', action='store_true',
help='Mostly dump details of created objects.'
)
args = parser.parse_args()
args.logfile and logger.addHandler(logging.FileHandler(args.logfile))
courses = None
if args.file:
with open(args.file, 'r') as fp:
courses = json.loads(fp.read())