#!/usr/bin/python3 ## Hemiptera: ## Simple E-Mail-based Bugtracker # # DIRECTORY STRUCTURE # basedir # ├── bugcount <- File keeping track of the count of bugs # ├── bugs ← For direct access to bugs for fast reply indexing # │ ├── 388 ⇒ basedir/projects/foo-bar/388 # │ └── 498 ⇒ basedir/projects/bar-baz/498 # ├── inbox ← Emails arrive here # ├── lock ⇒ hemiptera:$PID ← Filesystem lock # ├── outbox # │ ├── 1529069051.7963576.bugreply # │ └── 1529069051.7971687.bugreply # ├── projects ← Directory containing bugs, sorted by directory # │ ├── foo-bar # │ │ ├── 388 ← Directory containing messages for bug 388 # │ │ │ ├── 1528380502.4440382 # │ │ │ ├── 1529069051.7901955 # │ │ │ └── subscribers ← File containing subscribers of bug 388 # │ │ └── devs.txt ← File containing addresses of developers of foo-bar # │ └── bar-baz # │ ├── [...] # └── unconfirmed # ├── 0f61caf5ff24b5871835801c008341e7347928ba # ├── 8751c708556bb583c19b6ed87a891c3f25b99e2b # import email from email import policy from email.parser import BytesParser, Parser from email.utils import parseaddr import email.utils import sys import os import time from hemiptera_common import * import glob LOCKPATH =opj(basedir, "lock") class Bug : def __init__(self, path) : self.path = path replies = glob.glob(os.path.join(self.path, "[0-9]*")) def get_subs(self) : if os.path.exists(opj(self.path, "subscribers")) : with open(os.path.join(self.path, "subscribers"), "r+" ) as f : return [line.strip() for line in f] else : return [] def set_subs(self, subs) : with open(opj(self.path, "subscribers", "w" )) as f : f.write("\n".join(subs)) f.close() def get_message(self, i) : # get i-th reply to the bug f = open(replies[i], "rb") m = BytesParser(policy=policy.default).parse(f) f.close() return m def get_op(self) : m = self.get_message(0) return m["From"] def get_subject(self) : m = self.get_message(0) return m["Subject"] def check_lock() : if os.path.islink(LOCKPATH) : sys.stderr.write("ERROR: Directory locked by "+os.readlink(LOCKPATH)+". Aborting\n") sys.exit(255) def get_lock() : """ Get a filesystem lock, as emacs does it (symlink to PID)""" check_lock() os.symlink("hemiptera:"+str(os.getpid()), LOCKPATH) def remove_lock() : os.remove(LOCKPATH) def get_subs(bug) : """ Get e-mail addresses of people who subscribed to bug """ if os.path.exists(os.path.join(bug, "subscribers")) : with open(os.path.join(bug, "subscribers"), "r+" ) as f : return [line.strip() for line in f] else : return None def write_subs(bug, subs) : with open(os.path.join(bug, "subscribers"), "w") as f : f.write("\n".join(subs)) f.close() def get_op(bug) : r = glob.glob(os.path.join(bug, "[0-9]*")) f = open(r[0], "rb") m = BytesParser(policy=policy.default).parse(f) f.close() return m["From"] def write_message(path, m) : """ Write message m to path """ f = open(path, "wb") f.write(m.as_bytes()) f.close() def get_message_data(m) : """ returns subject and content of message """ content = "" try : content = m.get_body("plain").get_content() except : pass subject = m["Subject"] return subject,content def get_from_addr(m) : """ returns the from address of a message """ return parseaddr(m["From"])[1] def get_to_addr(m) : """ returns the to address of a message """ return parseaddr(m["To"])[1] def generate_confirmation_id() : """ Generates a confirmation ID for text """ return os.urandom(8).hex() def create_censored_message(msg, TrustedDate=False) : """This transforms a mail message received by the script into a message which was stripped of any names and other personal information. Saved stuff: - Email address (No name) - Date - Subject - Body - references and In-Reply-To (if they exist) In a further step, when generating HTML, e-mail addresses will also get stripped. The TrustedDate Parameter indicates whether the date from the message is to be trusted. Otherwise, it is replaced with the date at which this function is run. """ m = email.message.EmailMessage() try : m.set_payload(msg.get_body("plain").get_content(), charset="utf-8") except : m.set_payload("", charset="utf-8") m["From"] = get_from_addr(msg) m["To"] = get_to_addr(msg) m["Subject"] = msg["Subject"] if not "Date" in msg or not TrustedDate : del m["Date"] m["Date"] = email.utils.formatdate() else : m["Date"] = msg["Date"] if "In-Reply-To" in msg : m["In-Reply-To"] = msg["In-Reply-To"] if "References" in msg: m["References"] = msg["References"] m["Message-ID"] = email.utils.make_msgid(domain=DOMAIN) return m def gen_message(body,subject, to, fr, headers=None) : r = email.message.EmailMessage() r["To"] = to r["From"] = fr r["Subject"] = subject r.set_payload(body, charset="utf-8") os.makedirs(outbox, exist_ok=True) fname = opj(outbox, str(time.time())+".bugreply") write_message(fname, r) def reply(body, subject, to, bugid) : """ This function generates a reply message""" fr = str(bugid) + "@" + DOMAIN gen_message(body,subject,to,fr) def get_project(bug) : """Get project name for bug path pointing into global bug directory""" dest = os.readlink(bug) return dest.split(os.sep)[-2] def create_unconfirmed_bug(msg) : ## send back confirmation message get_lock() fromaddr = get_from_addr(msg) confid = generate_confirmation_id() print("ID: "+confid) bugpath = os.path.join(basedir, "unconfirmed" , confid) fr = "confirm-bug@" + DOMAIN subject, _ = get_message_data(msg) gen_message("Your bug has been registered. To prevent spam, you need to confirm it by answering to this message with the following confirmation id: " + confid + ". Just hitting reply should work.",subject,fromaddr,fr) os.makedirs(bugpath) m = create_censored_message(msg) write_message(os.path.join(bugpath, "message"), m) remove_lock() return def create_bug(prname, msg) : get_lock() bak = open(opj(basedir, "bugcount.bak"), "w") try : bg = open("bugcount", "r+") bugcount = int(bg.read()) bak.write(str(bugcount)) bak.close() bg.close() bg = open("bugcount", "w") except FileNotFoundError : bg = open("bugcount", "w") bg.write("0") bugcount = 0 bg.close() bg = open("bugcount", "w") bugcount += 1 bg.write(str(bugcount)) bugpath = os.path.join(basedir, "projects" , prname,str(bugcount)) os.makedirs(bugpath) subs = get_devs(prname) fromaddr = get_from_addr(msg) if fromaddr not in subs : subs.append(fromaddr) os.symlink( bugpath, os.path.join(bugdir, str(bugcount)), target_is_directory=True) m = create_censored_message(msg) subject,content = get_message_data(m) write_message(os.path.join(bugpath, str(time.time())), m) write_subs(bugpath, subs) print( """ You have sent a bug report to %s. It has been processed under the id %d """ % (prname, bugcount)) for sub in subs : ### Send out an e-mail to each subscriber reply(content, subject, sub, str(bugcount)) remove_lock() def reply_to_bug(bugname, msg) : get_lock() bug = os.path.join(bugdir, bugname) fromaddr = get_from_addr(msg) cm = create_censored_message(msg) subject,content = get_message_data(cm) write_message(os.path.join(bug, str(time.time())), cm) print(""" Your reply to bug ID %s has been processed """%bugname) subs = get_subs(bug) if not fromaddr in subs : subs.append(fromaddr) write_subs(bug, subs) for sub in subs : reply(content, subject, sub, bugname) remove_lock() def process_command(bugname, msg) : bug = os.path.join(bugdir, bugname) fromaddr = get_from_addr(msg) cm = create_censored_message(msg) subject,content = get_message_data(cm) if "%close" in content : f = open(opj(bug, "closed"), "w") f.write(cm["Date"]) f.close() subs = get_subs(bug) for sub in subs : reply("This bug was closed", "Bug #%s closed" % bugname, sub, bugname) def import_bugs() : """ Imports bugs from Inbox """ os.makedirs(inbox, exist_ok=True) for i in os.listdir(inbox) : g = open(opj(inbox, i), "rb") m = BytesParser(policy=policy.default).parse(g) process_message(m) os.remove(opj(inbox, i)) def process_message(m) : if "MAILER-DAEMON" in get_from_addr(m) : return toaddr = get_to_addr(m) os.makedirs(bugdir, exist_ok=True) dest = toaddr.split("@")[0] if dest in prlist : create_unconfirmed_bug(m) elif dest in os.listdir(bugdir) : reply_to_bug(dest, m) elif dest.split("-")[0] in os.listdir(bugdir) and dest.split("-")[1] == "command" : ### Process command print("Command spotted") process_command(dest.split("-")[0],m) elif dest.startswith("confirm-bug") : unconf_dir = opj(basedir, "unconfirmed") subject, content = get_message_data(m) for i in os.listdir(unconf_dir) : if i in content : f = open(opj(unconf_dir, i, "message"), "rb") m = BytesParser(policy=policy.default).parse(f) f.close() p = parseaddr(m["To"])[1].split("@")[0] create_bug(p,m) import shutil shutil.rmtree(opj(unconf_dir, i)) exit(0) print("Invalid activation id") else : print("Invalid message") import_bugs() # m = BytesParser(policy=policy.default).parse(g) # fromaddr = get_from_addr(m) # toaddr = get_to_addr(m) # subject,content = get_message_data(m) # bugdir = os.path.join(basedir, "bugs") # os.makedirs(bugdir, exist_ok=True) # prname = toaddr.split("@")[0] # print(prname) # if prname in prlist : # create_unconfirmed_bug(m) # elif prname in os.listdir(bugdir) : # reply_to_bug(prname, m) # elif prname.split("-")[0] in os.listdir(bugdir) and prname.split("-")[1] == "command" : # ### Process command # print("Command spotted") # process_command(prname.split("-")[0],m) # elif prname.startswith("confirm-bug") : # unconf_dir = opj(basedir, "unconfirmed") # for i in os.listdir(unconf_dir) : # if i in content : # f = open(opj(unconf_dir, i, "message"), "rb") # m = BytesParser(policy=policy.default).parse(f) # f.close() # p = parseaddr(m["To"])[1].split("@")[0] # create_bug(p,m) # import shutil # shutil.rmtree(opj(unconf_dir, i)) # exit(0) # print("Invalid activation id") # else : # print("Invalid message")