CoderSupreme

joined 1 year ago
4
Permanently Deleted (programming.dev)
submitted 10 months ago* (last edited 6 months ago) by [email protected] to c/[email protected]
 

Permanently Deleted

-1
... (programming.dev)
submitted 11 months ago* (last edited 6 months ago) by [email protected] to c/[email protected]
15
... (survey.stackoverflow.co)
submitted 11 months ago* (last edited 6 months ago) by [email protected] to c/[email protected]
34
Permanently Deleted (programming.dev)
submitted 11 months ago* (last edited 6 months ago) by [email protected] to c/[email protected]
 

Permanently Deleted

39
... (programming.dev)
submitted 11 months ago* (last edited 6 months ago) by [email protected] to c/[email protected]
4
... (programming.dev)
submitted 11 months ago* (last edited 6 months ago) by [email protected] to c/[email protected]
 

The instance where I was using it changed it's rules to prevent bots from posting in it and I didn't care enough to search for another instance.

https://lemm.ee/c/issue_tracker?dataType=Post&page=1&sort=Active

`config_template.py

LEMMY_INSTANCE_URL = ""
LEMMY_COMMUNITY_NAME = ""
LEMMY_USERNAME = ""
LEMMY_PASSWORD = ""
GITHUB_API_BASE = "https://api.github.com"
GITHUB_URL_BASE = "https://github.com"
REPOSITORIES = ["LemmyNet/lemmy", "LemmyNet/lemmy-ui"]
DB_FILE = "lemmy_github.db"
DELAY = 1
MAX_BACKOFF_TIME = 300
PERSONAL_ACCESS_TOKEN = ""

github_lemmy_issue_reposter.py

import backoff
import datetime
import logging
import requests
import schedule
import sqlite3
import time

from config import *
from pythorhead import Lemmy
from typing import Any, Dict, Generator, List, Optional, Tuple, Callable, TypeVar

T = TypeVar('T')

# "[%(levelname)s]:%(asctime)s:%(name)s [%(filename)s:%(lineno)s - %(funcName)s()] %(message)s"
FORMAT = "%(message)s"
logging.basicConfig(
    level=logging.INFO,
    format=FORMAT,
    handlers=[logging.FileHandler("debug.log", mode="w"), logging.StreamHandler()],
)


def on_giveup(details: Dict[str, int]) -> None:
    logging.error(f"Failed to fetch issues after {details['tries']} attempts", exc_info=True)


def handle_errors(message: Optional[str] = None) -> Callable[[Callable[..., T]], Callable[..., T]]:
    def decorator(function: Callable[..., T]) -> Callable[..., T]:
        def wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> T:
            try:
                return function(*args, **kwargs)
            except Exception as e:
                if message:
                    logging.exception(f"{message} - Error in {function.__name__}:\n{e}")
                else:
                    logging.exception(f"Error in {function.__name__}:\n{e}")
                raise

        return wrapper

    return decorator


class GitHubIssue:
    def __init__(self, issue_dict: dict[str, Any], github_repo: str) -> None:
        try:
            self.url = issue_dict["html_url"]
            logging.info(f"Creating issue {self.url}")
            self.state = issue_dict["state"]
            self.state_fmt = "[Closed]" if issue_dict["state"] == "closed" else ""
            self.repo_abbr = "[UI]" if "lemmy-ui" in github_repo else "[BE]"
            self.title = f"{self.state_fmt}{self.repo_abbr} {issue_dict['title']} #{issue_dict['number']}"
            self.title = self.title[:200]
            self.body = issue_dict["body"]
            if self.body is not None:
                self.body = self.body[:30000]
            self.user = issue_dict["user"]["login"]
            self.user_url = issue_dict["user"]["html_url"]
            self.updated_at = datetime.datetime.strptime(issue_dict["updated_at"], '%Y-%m-%dT%H:%M:%SZ')
        except Exception as e:
            log_message: str = (
                f"Formatted issue:\n"
                f"  - Repo: {github_repo}\n"
                f"  - Issue State: {self.state}\n"
                f"  - Repo Abbreviation: {self.repo_abbr}\n"
                f"  - Title: {self.title}\n"
                f"  - URL: {self.url}\n"
                f"  - User: {self.user}\n"
                f"  - User URL: {self.user_url}\n"
                f"  - Updated At: {self.updated_at}\n"
            )
            logging.exception(log_message)
            logging.exception(e)

    @property
    def formatted_body(self) -> str:
        formatted_body: str = self.body
        try:
            if self.body is not None:
                formatted_body = self.body.replace("\n", "\n> ")
                formatted_body = f"> {formatted_body}\n> \n> *Originally posted by [{self.user}]({self.user_url}) in [#{self.number}]({self.url})*"
        except Exception as e:
            logging.exception(f"Error formatting body for {self.url}\n{e}")
        return formatted_body

    @property
    def number(self) -> int:
        return int(self.url.split("/")[-1])


class GitHubComment:
    def __init__(self, comment_dict: dict[str, Any], issue_number: int) -> None:
        self.id = comment_dict["id"]
        self.body = comment_dict["body"]
        self.user = comment_dict["user"]["login"]
        self.user_url = comment_dict["user"]["html_url"]
        self.url = comment_dict["html_url"]
        self.issue_number = issue_number

    @property
    def formatted_comment(self) -> str:
        formatted_body:str = self.body.replace("\n", "\n> ")
        formatted_body = f"> {formatted_body}\n> \n> *Originally posted by [{self.user}]({self.user_url}) in [#{self.issue_number}]({self.url})*"
        return formatted_body


@handle_errors("Error initializing database")
def initialize_database() -> sqlite3.Connection:
    logging.info("Initializing database")
    conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
    cursor: sqlite3.Cursor = conn.cursor()
    cursor.execute(
        """
        CREATE TABLE IF NOT EXISTS posts (
            issue_number INTEGER PRIMARY KEY,
            lemmy_post_id INTEGER NOT NULL UNIQUE,
            issue_title TEXT,
            issue_body TEXT,
            updated_at TIMESTAMP DEFAULT NULL
        )
    """
    )
    cursor.execute(
        """
        CREATE TABLE IF NOT EXISTS comments (
            github_comment_id INTEGER PRIMARY KEY,
            lemmy_comment_id INTEGER NOT NULL UNIQUE,
            comment_user TEXT,
            comment_body TEXT
            updated_at TIMESTAMP DEFAULT NULL
        )
    """
    )
    cursor.execute(
    """
    CREATE TABLE IF NOT EXISTS last_updated (
        id INTEGER PRIMARY KEY,
        last_updated_time TIMESTAMP
    );
    """
    )
    conn.commit()
    return conn


def get_last_updated_time() -> str:
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    cursor.execute("SELECT last_updated_time FROM last_updated WHERE id = 1")
    last_updated_time: str = cursor.fetchone()[0]
    conn.close()

    return last_updated_time


def update_last_updated_time() -> None:
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    current_time = datetime.datetime.utcnow().isoformat()
    
    cursor.execute("UPDATE last_updated SET last_updated_time = ? WHERE id = 1", (current_time,))
    if cursor.rowcount == 0:
        cursor.execute("INSERT INTO last_updated (id, last_updated_time) VALUES (1, ?)", (current_time,))
    
    conn.commit()
    conn.close()
    logging.info("Updated last updated time")


def update_post_time(post_id: int, updated_at: datetime.datetime) -> None:
    conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
    cursor: sqlite3.Cursor = conn.cursor()
    time_formatted = updated_at.strftime('%Y-%m-%d %H:%M:%S')
    SQL = "UPDATE posts SET updated_at = ? WHERE lemmy_post_id = ?"
    cursor.execute(SQL, (time_formatted, post_id))
    conn.commit()
    conn.close()


def check_updated_at(issue_number: int) -> Optional[Tuple[int, str, str, Optional[str]]]:
    logging.info(f"Checking last post update for {issue_number}")
    conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
    cursor: sqlite3.Cursor = conn.cursor()
    SQL = "SELECT lemmy_post_id, issue_title, issue_body, updated_at FROM posts WHERE issue_number = ?"
    cursor.execute(SQL, (issue_number,))
    result: Tuple[int, str, str, Optional[str]] = cursor.fetchone()
    conn.close()

    if result is None:
        logging.info(f"No post found for {issue_number}")
        return None
    else:
        logging.info(f"Found post for {issue_number}")
        return result


@handle_errors("Error initializing Lemmy instance")
def initialize_lemmy_instance() -> Lemmy:
    logging.info("Initializing Lemmy instance")
    lemmy = Lemmy(LEMMY_INSTANCE_URL)
    logging.info(f"Initialized Lemmy instance in {LEMMY_INSTANCE_URL}")
    lemmy.log_in(LEMMY_USERNAME, LEMMY_PASSWORD)
    logging.info(f"Logged in to Lemmy instance with user {LEMMY_USERNAME}")
    return lemmy


@backoff.on_exception(
    backoff.expo,
    (requests.exceptions.RequestException, TypeError),
    max_time=MAX_BACKOFF_TIME,
    on_giveup=on_giveup,
)
def fetch_github_data(url: str) -> List[Dict[str, Any]]:
    global LAST_REQUEST_TIME
    try:
        headers = {
            "Accept": "application/vnd.github+json",
            "Authorization": f"Bearer {PERSONAL_ACCESS_TOKEN}",
            "X-GitHub-Api-Version": "2022-11-28",
        }
        time_elapsed = time.time() - LAST_REQUEST_TIME
        required_delay = max(0, DELAY - time_elapsed)
        time.sleep(required_delay)
        response = requests.get(url, headers=headers)
        LAST_REQUEST_TIME = time.time()
        logging.info(f"Fetched data from {url}")
        res: List[Dict[str, Any]] = response.json()
        return res
    except requests.exceptions.RequestException as e:
        logging.exception(f"Error fetching data from {url}\n{e}")
        raise


def check_existing_post(issue_number: str) -> Optional[int]:
    conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
    cursor: sqlite3.Cursor = conn.cursor()
    SQL = "SELECT lemmy_post_id FROM posts WHERE issue_number=?"
    cursor.execute(SQL, (issue_number,))
    post_id: Optional[tuple[int]] = cursor.fetchone()
    if post_id:
        return post_id[0]
    return None


def insert_post_to_db(issue: GitHubIssue, lemmy_post_id: Optional[int]) -> None:
    try:
        conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
        cursor: sqlite3.Cursor = conn.cursor()
        SQL = "INSERT INTO posts (issue_number, lemmy_post_id, issue_title, issue_body, updated_at) VALUES (?, ?, ?, ?, ?)"
        cursor.execute(SQL, (issue.number, lemmy_post_id, issue.title, issue.formatted_body, issue.updated_at))
        conn.commit()
        logging.info(f"Inserted new Lemmy post {lemmy_post_id} into the database")
    except sqlite3.Error as e:
        logging.exception(f"Error inserting post into the database for issue {issue.title} with url {issue.url}\n{e}")
        raise


def insert_comment_to_database(cursor: sqlite3.Cursor, github_comment_id: int, lemmy_comment_id: int, comment: GitHubComment) -> None:
    try:
        SQL = "INSERT INTO comments (github_comment_id, lemmy_comment_id, comment_user, comment_body) VALUES (?, ?, ?, ?)"
        cursor.execute(SQL, (github_comment_id, lemmy_comment_id, comment.user, comment.formatted_comment,))
        logging.info(f"Inserted comment {github_comment_id} into the database")
    except Exception as e:
        logging.exception(f"Error encountered while inserting comment {github_comment_id} to database\n{e}")


@backoff.on_exception(
    backoff.expo,
    (requests.exceptions.RequestException, TypeError),
    max_time=MAX_BACKOFF_TIME,
    on_giveup=on_giveup,
)
def create_lemmy_post(lemmy: Any, community_id: int, issue: GitHubIssue) -> Optional[int]:
    lemmy_post_id: Optional[int] = None
    lemmy_post_id = lemmy.post.create(community_id, issue.title, url=issue.url, body=issue.body)["post_view"]["post"]["id"]
    lemmy_url = f"{LEMMY_INSTANCE_URL}/post/{lemmy_post_id}"
    logging.info(f"Posted issue {lemmy_url}")

    return lemmy_post_id


@backoff.on_exception(
    backoff.expo,
    (requests.exceptions.RequestException, TypeError),
    max_time=MAX_BACKOFF_TIME,
    on_giveup=on_giveup,
)
def create_lemmy_comment(lemmy: Any, post_id: Optional[int], comment: GitHubComment) -> Optional[int]:
    logging.info(f"Creating new Lemmy comment in {LEMMY_INSTANCE_URL}/post/{post_id}")

    if not post_id:
        logging.warning("Post ID is empty. Skipping comment creation")
        return None

    response = lemmy.comment.create(post_id, comment.formatted_comment)
    lemmy_comment_id:int = response["comment_view"]["comment"]["id"]
    logging.info(f"Successfully created Lemmy comment {LEMMY_INSTANCE_URL}/comment/{lemmy_comment_id}")

    return lemmy_comment_id


def get_total_issues(github_repo: str) -> int:
    url: str = f"https://api.github.com/repos/{github_repo}"
    data: List[Dict[str, Any]] = fetch_github_data(url)
    total_issues: int = data["open_issues_count"]
    return total_issues


def fetch_issues(github_repo: str, last_updated_time: str) -> Generator[Dict[str, Any], None, None]:
    page = 1
    per_page = 100
    issues_url = (f"{GITHUB_API_BASE}/repos/{github_repo}/issues?state=all&since={last_updated_time}&per_page={per_page}")

    while True:
        page_url = f"{issues_url}&page={page}"
        issues: List[Dict[str, Any]] = fetch_github_data(page_url)

        if not issues:
            break

        for issue_dict in issues:
            yield issue_dict

        page += 1


@backoff.on_exception(
    backoff.expo,
    (requests.exceptions.RequestException, TypeError),
    max_time=MAX_BACKOFF_TIME,
    on_giveup=on_giveup,
)
def edit_lemmy_post(lemmy: Any, lemmy_post_id: int, issue: GitHubIssue) -> None:
    lemmy.post.edit(lemmy_post_id, name=issue.title, url=issue.url, body=issue.body)


def process_issues(lemmy: Any, community_id: int, github_repo: str) -> None:
    last_updated_time = get_last_updated_time()
    update_last_updated_time()
    for issue_dict in fetch_issues(github_repo, last_updated_time):
        process_issue(lemmy, community_id, github_repo, issue_dict)


def process_issue(lemmy: Any, community_id: int, github_repo: str, issue_dict: dict[str, Any]) -> None:
    issue: GitHubIssue = GitHubIssue(issue_dict, github_repo)
    res: Optional[Tuple[int, str, str, Optional[str]]] = check_updated_at(issue.number)

    if res is None:
        create_new_lemmy_post(lemmy, community_id, github_repo, issue)
    else:
        lemmy_post_id, existing_title, existing_body, updated_at = res
        if updated_at is None or has_enough_time_passed(updated_at, issue.updated_at):
            update_issue_if_needed(lemmy, lemmy_post_id, existing_title, existing_body, issue)
            process_comments(lemmy, lemmy_post_id, github_repo, issue)
            update_post_time(lemmy_post_id, issue.updated_at)


def has_enough_time_passed(old_updated_at_str: str, new_updated_at: datetime.datetime) -> bool:
    old_updated_at = datetime.datetime.strptime(old_updated_at_str, '%Y-%m-%d %H:%M:%S')
    time_difference: datetime.timedelta = new_updated_at - old_updated_at
    return time_difference >= datetime.timedelta(hours=2)


def update_issue_if_needed(lemmy: Any, lemmy_post_id: int, existing_title: str, existing_body: str, issue: GitHubIssue) -> None:
    if existing_title != issue.title or existing_body != issue.formatted_body:
        edit_lemmy_post(lemmy, lemmy_post_id, issue)


def create_new_lemmy_post(lemmy: Any, community_id: int, github_repo: str, issue: GitHubIssue) -> None:
    lemmy_post_id: Optional[int] = post_issue_to_lemmy(lemmy, community_id, issue)
    insert_post_to_db(issue, lemmy_post_id)
    process_comments(lemmy, lemmy_post_id, github_repo, issue)


def post_issue_to_lemmy(lemmy: Any, community_id: int, issue: GitHubIssue) -> Optional[int]:
    try:
        logging.info(f"Start posting issue {issue.title} to community {community_id}")
        lemmy_post_id: Optional[int] = create_lemmy_post(lemmy, community_id, issue)
        return lemmy_post_id
    except Exception as e:
        logging.exception(f"Error posting issue {issue.title} to community {community_id}\n{e}")
        return None


def process_comments(lemmy: Any, post_id: Optional[int], github_repo: str, issue: GitHubIssue) -> None:
    try:
        logging.info(f"Posting comments from issue #{issue.number} to Lemmy post {LEMMY_INSTANCE_URL}/post/{post_id}")
        comments_url: str = f"{GITHUB_API_BASE}/repos/{github_repo}/issues/{issue.number}/comments"
        comments: Dict[str, Any] = fetch_github_data(comments_url)
        for comment_data in comments:
            if isinstance(comment_data, str):
                logging.warning(f"Skipping comment {comment_data}")
                continue
            process_comment(lemmy, github_repo, comment_data, post_id, issue.number)
    except Exception as e:
        logging.exception(f"Error posting comments to lemmy post {post_id}\n{e}")

def process_comment(lemmy: Any, github_repo: str, comment_data: Dict[str, Any], post_id: Optional[int], issue_number: int) -> None:
    conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
    cursor: sqlite3.Cursor = conn.cursor()
    comment = GitHubComment(comment_data, issue_number)

    existing_comment_id: Optional[int] = get_existing_comment_id(cursor, comment.id)
    if existing_comment_id:
        logging.info(f"Skipping existing comment with GitHub comment ID: {comment.id}")
        return

    post_comment_to_lemmy(cursor, lemmy, github_repo, comment, post_id, issue_number)
    conn.commit()


def post_comment_to_lemmy(cursor: sqlite3.Cursor, lemmy: Any, github_repo: str, comment: GitHubComment, post_id: Optional[int], issue_number: int) -> None:
    lemmy_post_url = f"{LEMMY_INSTANCE_URL}/post/{post_id}"
    comment_url = f"{GITHUB_URL_BASE}/{github_repo}/issues/{issue_number}#issuecomment-{comment.id}"
    logging.info(f"Posting comment {comment.url} to Lemmy post {lemmy_post_url}")
    lemmy_comment_id: Optional[int] = create_lemmy_comment(lemmy, post_id, comment)

    if not lemmy_comment_id:
        logging.exception(f"Error creating Lemmy comment {lemmy_comment_id} to {lemmy_post_url} from Github comment {comment.url}")
        return

    logging.info(f"Posted comment {comment_url} to Lemmy post {lemmy_post_url}")
    insert_comment_to_database(cursor, comment.id, lemmy_comment_id, comment)


def get_existing_comment_id(cursor: sqlite3.Cursor, github_comment_id: int) -> Optional[int]:
    logging.info(f"Checking if comment with GitHub comment ID: {github_comment_id} exists")
    cursor.execute("SELECT lemmy_comment_id FROM comments WHERE github_comment_id=?", (github_comment_id,))
    existing_comment = cursor.fetchone()
    if existing_comment is not None:
        logging.info(f"Found existing comment with GitHub comment ID: {github_comment_id}")
        existing_comment_id: int = existing_comment[0]
        return existing_comment_id
    else:
        logging.info(f"No existing comment found with GitHub comment ID: {github_comment_id}")
        return None


def fetch_issue_data(github_repo: str) -> List[Tuple[str, Optional[int]]]:
    logging.info("Fetching the GitHub issue number and Lemmy post ID for all issues")
    conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
    cursor: sqlite3.Cursor = conn.cursor()
    SQL = "SELECT issue_url, lemmy_post_id FROM posts WHERE issue_url LIKE ?"
    issues_url = f"https://github.com/{github_repo}/issues/%"
    issue_data = cursor.execute(SQL, (issues_url,)).fetchall()
    logging.info(f"Fetched {len(issue_data)} issues")
    return issue_data


def process_repo(lemmy: Any, community_id: int, github_repo: str) -> None:
    try:
        logging.info(f"Processing repository {github_repo}")
        process_issues(lemmy, community_id, github_repo)
    except Exception as e:
        logging.exception(f"Error occurred while processing repository {github_repo}\n{e}")


def main() -> None:
    logging.info("Running main function")
    initialize_database()
    lemmy = initialize_lemmy_instance()
    community_id = lemmy.discover_community(LEMMY_COMMUNITY_NAME)

    for github_repo in REPOSITORIES:
        process_repo(lemmy, community_id, github_repo)


def run_periodically() -> None:
    logging.info("Starting periodic run")
    schedule.every(1).hours.do(main)

    while True:
        try:
            schedule.run_pending()
        except Exception as e:
            logging.exception(f"Error occurred during scheduling\n{e}")
        time.sleep(60)


if __name__ == "__main__":
    try:
        logging.info("Starting script")
        main()
        run_periodically()
    except Exception as e:
        logging.exception(f"Error occurred during script execution\n{e}")

requirements.txt

pythorhead==0.12.3
schedule==1.2.0
backoff==2.2.1
feedparser==6.0.10
16
... (programming.dev)
submitted 11 months ago* (last edited 6 months ago) by [email protected] to c/[email protected]
 

Hey everyone, I was wondering what you think about having a bot for nightly builds to test the latest changes and discover regressions. This would allow some instances to test the latest changes and discover regressions so that we don't get stuck with those until the next big release. I think this would be a great way to improve the user experience. What do you think?

2
... (programming.dev)
submitted 1 year ago* (last edited 1 year ago) by [email protected] to c/[email protected]
 

I'm looking for more cost-effective alternatives to Perplexity.ai that offer Claude 2 or similar integration along with search capabilities for factual assistance, ideally around $10/month instead of the $20/month subscription fee for Perplexity.ai. I've found Phind.com, a developer-focused search engine that uses GPT-4 to answer technical questions with code examples and detailed explanations. While it may not be as good as Perplexity.ai, it offers more free uses. Are there any other options that combine large context and search features at a lower price point?

 

Fellow Lemmy users,

The Lemmy development team is considering adding a new tag system that would allow us to tag posts with keywords. This could make it easier to search for and find content on Lemmy.

Before implementing this, the team would like our feedback as users. Specifically:

  • Do you think having post tags would be helpful on Lemmy? Why or why not?

  • How should tags be displayed and integrated into Lemmy?

Please share your thoughts on whether you'd find a tag system useful, and if so, how you'd want it implemented. The dev team reads the feedback and will use it to decide how to proceed.

To give your input, you can comment or vote here or on the GitHub issue[^1].

Thanks for helping shape Lemmy! This is our community, so please speak up.

Looking forward to hearing your thoughts.

[^1]: GitHub — Post tags

18
... (github.com)
submitted 1 year ago* (last edited 1 year ago) by [email protected] to c/[email protected]
 

Fellow Lemmy users,

The Lemmy development team is considering adding a new tag system that would allow us to tag posts with keywords. This could make it easier to search for and find content on Lemmy.

Before implementing this, the team would like our feedback as users. Specifically:

  • Do you think having post tags would be helpful on Lemmy? Why or why not?

  • What kinds of tags would you want to use? Topics, genres, etc?

  • How should tags be displayed and integrated into Lemmy?

Please share your thoughts on whether you'd find a tag system useful, and if so, how you'd want it implemented. The dev team reads the feedback and will use it to decide how to proceed.

To give your input, you can comment or vote here or on the GitHub issue[^1].

Thanks for helping shape Lemmy! This is our community, so please speak up.

Looking forward to hearing your thoughts.

[^1]: GitHub — Post tags

1
... (github.com)
submitted 1 year ago* (last edited 6 months ago) by [email protected] to c/[email protected]
4
... (github.com)
submitted 1 year ago* (last edited 6 months ago) by [email protected] to c/[email protected]
view more: ‹ prev next ›