The instance where I was using it changed it's rules to prevent bots from posting in it and I didn't care enough to search for another instance.
https://lemm.ee/c/issue_tracker?dataType=Post&page=1&sort=Active
`config_template.py
LEMMY_INSTANCE_URL = ""
LEMMY_COMMUNITY_NAME = ""
LEMMY_USERNAME = ""
LEMMY_PASSWORD = ""
GITHUB_API_BASE = "https://api.github.com"
GITHUB_URL_BASE = "https://github.com"
REPOSITORIES = ["LemmyNet/lemmy", "LemmyNet/lemmy-ui"]
DB_FILE = "lemmy_github.db"
DELAY = 1
MAX_BACKOFF_TIME = 300
PERSONAL_ACCESS_TOKEN = ""
github_lemmy_issue_reposter.py
import backoff
import datetime
import logging
import requests
import schedule
import sqlite3
import time
from config import *
from pythorhead import Lemmy
from typing import Any, Dict, Generator, List, Optional, Tuple, Callable, TypeVar
T = TypeVar('T')
# "[%(levelname)s]:%(asctime)s:%(name)s [%(filename)s:%(lineno)s - %(funcName)s()] %(message)s"
FORMAT = "%(message)s"
logging.basicConfig(
level=logging.INFO,
format=FORMAT,
handlers=[logging.FileHandler("debug.log", mode="w"), logging.StreamHandler()],
)
def on_giveup(details: Dict[str, int]) -> None:
logging.error(f"Failed to fetch issues after {details['tries']} attempts", exc_info=True)
def handle_errors(message: Optional[str] = None) -> Callable[[Callable[..., T]], Callable[..., T]]:
def decorator(function: Callable[..., T]) -> Callable[..., T]:
def wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> T:
try:
return function(*args, **kwargs)
except Exception as e:
if message:
logging.exception(f"{message} - Error in {function.__name__}:\n{e}")
else:
logging.exception(f"Error in {function.__name__}:\n{e}")
raise
return wrapper
return decorator
class GitHubIssue:
def __init__(self, issue_dict: dict[str, Any], github_repo: str) -> None:
try:
self.url = issue_dict["html_url"]
logging.info(f"Creating issue {self.url}")
self.state = issue_dict["state"]
self.state_fmt = "[Closed]" if issue_dict["state"] == "closed" else ""
self.repo_abbr = "[UI]" if "lemmy-ui" in github_repo else "[BE]"
self.title = f"{self.state_fmt}{self.repo_abbr} {issue_dict['title']} #{issue_dict['number']}"
self.title = self.title[:200]
self.body = issue_dict["body"]
if self.body is not None:
self.body = self.body[:30000]
self.user = issue_dict["user"]["login"]
self.user_url = issue_dict["user"]["html_url"]
self.updated_at = datetime.datetime.strptime(issue_dict["updated_at"], '%Y-%m-%dT%H:%M:%SZ')
except Exception as e:
log_message: str = (
f"Formatted issue:\n"
f" - Repo: {github_repo}\n"
f" - Issue State: {self.state}\n"
f" - Repo Abbreviation: {self.repo_abbr}\n"
f" - Title: {self.title}\n"
f" - URL: {self.url}\n"
f" - User: {self.user}\n"
f" - User URL: {self.user_url}\n"
f" - Updated At: {self.updated_at}\n"
)
logging.exception(log_message)
logging.exception(e)
@property
def formatted_body(self) -> str:
formatted_body: str = self.body
try:
if self.body is not None:
formatted_body = self.body.replace("\n", "\n> ")
formatted_body = f"> {formatted_body}\n> \n> *Originally posted by [{self.user}]({self.user_url}) in [#{self.number}]({self.url})*"
except Exception as e:
logging.exception(f"Error formatting body for {self.url}\n{e}")
return formatted_body
@property
def number(self) -> int:
return int(self.url.split("/")[-1])
class GitHubComment:
def __init__(self, comment_dict: dict[str, Any], issue_number: int) -> None:
self.id = comment_dict["id"]
self.body = comment_dict["body"]
self.user = comment_dict["user"]["login"]
self.user_url = comment_dict["user"]["html_url"]
self.url = comment_dict["html_url"]
self.issue_number = issue_number
@property
def formatted_comment(self) -> str:
formatted_body:str = self.body.replace("\n", "\n> ")
formatted_body = f"> {formatted_body}\n> \n> *Originally posted by [{self.user}]({self.user_url}) in [#{self.issue_number}]({self.url})*"
return formatted_body
@handle_errors("Error initializing database")
def initialize_database() -> sqlite3.Connection:
logging.info("Initializing database")
conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
cursor: sqlite3.Cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS posts (
issue_number INTEGER PRIMARY KEY,
lemmy_post_id INTEGER NOT NULL UNIQUE,
issue_title TEXT,
issue_body TEXT,
updated_at TIMESTAMP DEFAULT NULL
)
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS comments (
github_comment_id INTEGER PRIMARY KEY,
lemmy_comment_id INTEGER NOT NULL UNIQUE,
comment_user TEXT,
comment_body TEXT
updated_at TIMESTAMP DEFAULT NULL
)
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS last_updated (
id INTEGER PRIMARY KEY,
last_updated_time TIMESTAMP
);
"""
)
conn.commit()
return conn
def get_last_updated_time() -> str:
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("SELECT last_updated_time FROM last_updated WHERE id = 1")
last_updated_time: str = cursor.fetchone()[0]
conn.close()
return last_updated_time
def update_last_updated_time() -> None:
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
current_time = datetime.datetime.utcnow().isoformat()
cursor.execute("UPDATE last_updated SET last_updated_time = ? WHERE id = 1", (current_time,))
if cursor.rowcount == 0:
cursor.execute("INSERT INTO last_updated (id, last_updated_time) VALUES (1, ?)", (current_time,))
conn.commit()
conn.close()
logging.info("Updated last updated time")
def update_post_time(post_id: int, updated_at: datetime.datetime) -> None:
conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
cursor: sqlite3.Cursor = conn.cursor()
time_formatted = updated_at.strftime('%Y-%m-%d %H:%M:%S')
SQL = "UPDATE posts SET updated_at = ? WHERE lemmy_post_id = ?"
cursor.execute(SQL, (time_formatted, post_id))
conn.commit()
conn.close()
def check_updated_at(issue_number: int) -> Optional[Tuple[int, str, str, Optional[str]]]:
logging.info(f"Checking last post update for {issue_number}")
conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
cursor: sqlite3.Cursor = conn.cursor()
SQL = "SELECT lemmy_post_id, issue_title, issue_body, updated_at FROM posts WHERE issue_number = ?"
cursor.execute(SQL, (issue_number,))
result: Tuple[int, str, str, Optional[str]] = cursor.fetchone()
conn.close()
if result is None:
logging.info(f"No post found for {issue_number}")
return None
else:
logging.info(f"Found post for {issue_number}")
return result
@handle_errors("Error initializing Lemmy instance")
def initialize_lemmy_instance() -> Lemmy:
logging.info("Initializing Lemmy instance")
lemmy = Lemmy(LEMMY_INSTANCE_URL)
logging.info(f"Initialized Lemmy instance in {LEMMY_INSTANCE_URL}")
lemmy.log_in(LEMMY_USERNAME, LEMMY_PASSWORD)
logging.info(f"Logged in to Lemmy instance with user {LEMMY_USERNAME}")
return lemmy
@backoff.on_exception(
backoff.expo,
(requests.exceptions.RequestException, TypeError),
max_time=MAX_BACKOFF_TIME,
on_giveup=on_giveup,
)
def fetch_github_data(url: str) -> List[Dict[str, Any]]:
global LAST_REQUEST_TIME
try:
headers = {
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {PERSONAL_ACCESS_TOKEN}",
"X-GitHub-Api-Version": "2022-11-28",
}
time_elapsed = time.time() - LAST_REQUEST_TIME
required_delay = max(0, DELAY - time_elapsed)
time.sleep(required_delay)
response = requests.get(url, headers=headers)
LAST_REQUEST_TIME = time.time()
logging.info(f"Fetched data from {url}")
res: List[Dict[str, Any]] = response.json()
return res
except requests.exceptions.RequestException as e:
logging.exception(f"Error fetching data from {url}\n{e}")
raise
def check_existing_post(issue_number: str) -> Optional[int]:
conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
cursor: sqlite3.Cursor = conn.cursor()
SQL = "SELECT lemmy_post_id FROM posts WHERE issue_number=?"
cursor.execute(SQL, (issue_number,))
post_id: Optional[tuple[int]] = cursor.fetchone()
if post_id:
return post_id[0]
return None
def insert_post_to_db(issue: GitHubIssue, lemmy_post_id: Optional[int]) -> None:
try:
conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
cursor: sqlite3.Cursor = conn.cursor()
SQL = "INSERT INTO posts (issue_number, lemmy_post_id, issue_title, issue_body, updated_at) VALUES (?, ?, ?, ?, ?)"
cursor.execute(SQL, (issue.number, lemmy_post_id, issue.title, issue.formatted_body, issue.updated_at))
conn.commit()
logging.info(f"Inserted new Lemmy post {lemmy_post_id} into the database")
except sqlite3.Error as e:
logging.exception(f"Error inserting post into the database for issue {issue.title} with url {issue.url}\n{e}")
raise
def insert_comment_to_database(cursor: sqlite3.Cursor, github_comment_id: int, lemmy_comment_id: int, comment: GitHubComment) -> None:
try:
SQL = "INSERT INTO comments (github_comment_id, lemmy_comment_id, comment_user, comment_body) VALUES (?, ?, ?, ?)"
cursor.execute(SQL, (github_comment_id, lemmy_comment_id, comment.user, comment.formatted_comment,))
logging.info(f"Inserted comment {github_comment_id} into the database")
except Exception as e:
logging.exception(f"Error encountered while inserting comment {github_comment_id} to database\n{e}")
@backoff.on_exception(
backoff.expo,
(requests.exceptions.RequestException, TypeError),
max_time=MAX_BACKOFF_TIME,
on_giveup=on_giveup,
)
def create_lemmy_post(lemmy: Any, community_id: int, issue: GitHubIssue) -> Optional[int]:
lemmy_post_id: Optional[int] = None
lemmy_post_id = lemmy.post.create(community_id, issue.title, url=issue.url, body=issue.body)["post_view"]["post"]["id"]
lemmy_url = f"{LEMMY_INSTANCE_URL}/post/{lemmy_post_id}"
logging.info(f"Posted issue {lemmy_url}")
return lemmy_post_id
@backoff.on_exception(
backoff.expo,
(requests.exceptions.RequestException, TypeError),
max_time=MAX_BACKOFF_TIME,
on_giveup=on_giveup,
)
def create_lemmy_comment(lemmy: Any, post_id: Optional[int], comment: GitHubComment) -> Optional[int]:
logging.info(f"Creating new Lemmy comment in {LEMMY_INSTANCE_URL}/post/{post_id}")
if not post_id:
logging.warning("Post ID is empty. Skipping comment creation")
return None
response = lemmy.comment.create(post_id, comment.formatted_comment)
lemmy_comment_id:int = response["comment_view"]["comment"]["id"]
logging.info(f"Successfully created Lemmy comment {LEMMY_INSTANCE_URL}/comment/{lemmy_comment_id}")
return lemmy_comment_id
def get_total_issues(github_repo: str) -> int:
url: str = f"https://api.github.com/repos/{github_repo}"
data: List[Dict[str, Any]] = fetch_github_data(url)
total_issues: int = data["open_issues_count"]
return total_issues
def fetch_issues(github_repo: str, last_updated_time: str) -> Generator[Dict[str, Any], None, None]:
page = 1
per_page = 100
issues_url = (f"{GITHUB_API_BASE}/repos/{github_repo}/issues?state=all&since={last_updated_time}&per_page={per_page}")
while True:
page_url = f"{issues_url}&page={page}"
issues: List[Dict[str, Any]] = fetch_github_data(page_url)
if not issues:
break
for issue_dict in issues:
yield issue_dict
page += 1
@backoff.on_exception(
backoff.expo,
(requests.exceptions.RequestException, TypeError),
max_time=MAX_BACKOFF_TIME,
on_giveup=on_giveup,
)
def edit_lemmy_post(lemmy: Any, lemmy_post_id: int, issue: GitHubIssue) -> None:
lemmy.post.edit(lemmy_post_id, name=issue.title, url=issue.url, body=issue.body)
def process_issues(lemmy: Any, community_id: int, github_repo: str) -> None:
last_updated_time = get_last_updated_time()
update_last_updated_time()
for issue_dict in fetch_issues(github_repo, last_updated_time):
process_issue(lemmy, community_id, github_repo, issue_dict)
def process_issue(lemmy: Any, community_id: int, github_repo: str, issue_dict: dict[str, Any]) -> None:
issue: GitHubIssue = GitHubIssue(issue_dict, github_repo)
res: Optional[Tuple[int, str, str, Optional[str]]] = check_updated_at(issue.number)
if res is None:
create_new_lemmy_post(lemmy, community_id, github_repo, issue)
else:
lemmy_post_id, existing_title, existing_body, updated_at = res
if updated_at is None or has_enough_time_passed(updated_at, issue.updated_at):
update_issue_if_needed(lemmy, lemmy_post_id, existing_title, existing_body, issue)
process_comments(lemmy, lemmy_post_id, github_repo, issue)
update_post_time(lemmy_post_id, issue.updated_at)
def has_enough_time_passed(old_updated_at_str: str, new_updated_at: datetime.datetime) -> bool:
old_updated_at = datetime.datetime.strptime(old_updated_at_str, '%Y-%m-%d %H:%M:%S')
time_difference: datetime.timedelta = new_updated_at - old_updated_at
return time_difference >= datetime.timedelta(hours=2)
def update_issue_if_needed(lemmy: Any, lemmy_post_id: int, existing_title: str, existing_body: str, issue: GitHubIssue) -> None:
if existing_title != issue.title or existing_body != issue.formatted_body:
edit_lemmy_post(lemmy, lemmy_post_id, issue)
def create_new_lemmy_post(lemmy: Any, community_id: int, github_repo: str, issue: GitHubIssue) -> None:
lemmy_post_id: Optional[int] = post_issue_to_lemmy(lemmy, community_id, issue)
insert_post_to_db(issue, lemmy_post_id)
process_comments(lemmy, lemmy_post_id, github_repo, issue)
def post_issue_to_lemmy(lemmy: Any, community_id: int, issue: GitHubIssue) -> Optional[int]:
try:
logging.info(f"Start posting issue {issue.title} to community {community_id}")
lemmy_post_id: Optional[int] = create_lemmy_post(lemmy, community_id, issue)
return lemmy_post_id
except Exception as e:
logging.exception(f"Error posting issue {issue.title} to community {community_id}\n{e}")
return None
def process_comments(lemmy: Any, post_id: Optional[int], github_repo: str, issue: GitHubIssue) -> None:
try:
logging.info(f"Posting comments from issue #{issue.number} to Lemmy post {LEMMY_INSTANCE_URL}/post/{post_id}")
comments_url: str = f"{GITHUB_API_BASE}/repos/{github_repo}/issues/{issue.number}/comments"
comments: Dict[str, Any] = fetch_github_data(comments_url)
for comment_data in comments:
if isinstance(comment_data, str):
logging.warning(f"Skipping comment {comment_data}")
continue
process_comment(lemmy, github_repo, comment_data, post_id, issue.number)
except Exception as e:
logging.exception(f"Error posting comments to lemmy post {post_id}\n{e}")
def process_comment(lemmy: Any, github_repo: str, comment_data: Dict[str, Any], post_id: Optional[int], issue_number: int) -> None:
conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
cursor: sqlite3.Cursor = conn.cursor()
comment = GitHubComment(comment_data, issue_number)
existing_comment_id: Optional[int] = get_existing_comment_id(cursor, comment.id)
if existing_comment_id:
logging.info(f"Skipping existing comment with GitHub comment ID: {comment.id}")
return
post_comment_to_lemmy(cursor, lemmy, github_repo, comment, post_id, issue_number)
conn.commit()
def post_comment_to_lemmy(cursor: sqlite3.Cursor, lemmy: Any, github_repo: str, comment: GitHubComment, post_id: Optional[int], issue_number: int) -> None:
lemmy_post_url = f"{LEMMY_INSTANCE_URL}/post/{post_id}"
comment_url = f"{GITHUB_URL_BASE}/{github_repo}/issues/{issue_number}#issuecomment-{comment.id}"
logging.info(f"Posting comment {comment.url} to Lemmy post {lemmy_post_url}")
lemmy_comment_id: Optional[int] = create_lemmy_comment(lemmy, post_id, comment)
if not lemmy_comment_id:
logging.exception(f"Error creating Lemmy comment {lemmy_comment_id} to {lemmy_post_url} from Github comment {comment.url}")
return
logging.info(f"Posted comment {comment_url} to Lemmy post {lemmy_post_url}")
insert_comment_to_database(cursor, comment.id, lemmy_comment_id, comment)
def get_existing_comment_id(cursor: sqlite3.Cursor, github_comment_id: int) -> Optional[int]:
logging.info(f"Checking if comment with GitHub comment ID: {github_comment_id} exists")
cursor.execute("SELECT lemmy_comment_id FROM comments WHERE github_comment_id=?", (github_comment_id,))
existing_comment = cursor.fetchone()
if existing_comment is not None:
logging.info(f"Found existing comment with GitHub comment ID: {github_comment_id}")
existing_comment_id: int = existing_comment[0]
return existing_comment_id
else:
logging.info(f"No existing comment found with GitHub comment ID: {github_comment_id}")
return None
def fetch_issue_data(github_repo: str) -> List[Tuple[str, Optional[int]]]:
logging.info("Fetching the GitHub issue number and Lemmy post ID for all issues")
conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
cursor: sqlite3.Cursor = conn.cursor()
SQL = "SELECT issue_url, lemmy_post_id FROM posts WHERE issue_url LIKE ?"
issues_url = f"https://github.com/{github_repo}/issues/%"
issue_data = cursor.execute(SQL, (issues_url,)).fetchall()
logging.info(f"Fetched {len(issue_data)} issues")
return issue_data
def process_repo(lemmy: Any, community_id: int, github_repo: str) -> None:
try:
logging.info(f"Processing repository {github_repo}")
process_issues(lemmy, community_id, github_repo)
except Exception as e:
logging.exception(f"Error occurred while processing repository {github_repo}\n{e}")
def main() -> None:
logging.info("Running main function")
initialize_database()
lemmy = initialize_lemmy_instance()
community_id = lemmy.discover_community(LEMMY_COMMUNITY_NAME)
for github_repo in REPOSITORIES:
process_repo(lemmy, community_id, github_repo)
def run_periodically() -> None:
logging.info("Starting periodic run")
schedule.every(1).hours.do(main)
while True:
try:
schedule.run_pending()
except Exception as e:
logging.exception(f"Error occurred during scheduling\n{e}")
time.sleep(60)
if __name__ == "__main__":
try:
logging.info("Starting script")
main()
run_periodically()
except Exception as e:
logging.exception(f"Error occurred during script execution\n{e}")
requirements.txt
pythorhead==0.12.3
schedule==1.2.0
backoff==2.2.1
feedparser==6.0.10