From aa167aee07ba8bfcf9777253e9c09fbeb64824c3 Mon Sep 17 00:00:00 2001 From: retoor Date: Wed, 5 Nov 2025 14:12:23 +0100 Subject: [PATCH] refactor: improve tokenizer and database classes --- CHANGELOG.md | 8 +++ tokenizer.py | 189 +++++++++++++++++++++++++++------------------------ 2 files changed, 109 insertions(+), 88 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2992c15..cb95940 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ + +## Version 0.5.0 - 2025-11-05 + +Users can now run the application inside a chroot container. Developers can use the new `chroot.py` script to initialize and enter these containers. + +**Changes:** 2 files, 118 lines +**Languages:** Markdown (8 lines), Python (110 lines) + ## Version 0.4.0 - 2025-11-05 Testing has been added to verify the HTTP functionality. This improves the reliability and quality of the HTTP features. diff --git a/tokenizer.py b/tokenizer.py index 82d0f99..4e5783c 100644 --- a/tokenizer.py +++ b/tokenizer.py @@ -1,14 +1,14 @@ -# I saved this script as gist because I wrote it a lot of times. +# I saved this script as gist because I wrote it a lot of times. # It has support for remembering line numbers and so on what is not used. # It was originally written in C by me and ported to Python. -# The original application did use these features. +# The original application did use these features. # Written by retoor@molodetz.nl # This script processes text files to identify words, their positions, and their frequency of occurrence. It uses command-line arguments to query or display popular words and stores results in a SQLite database. -# Imports: +# Imports: # - argparse: For handling command-line arguments # - sqlite3: A library to control and manage SQLite databases # - pathlib: To work with filesystem paths in an object-oriented way @@ -33,141 +33,154 @@ import argparse import sqlite3 -import pathlib +import pathlib parser = argparse.ArgumentParser() -parser.add_argument('--find', type=str, required=False, default="") -parser.add_argument('--index', action='store_true') -parser.add_argument('--popular', action='store_true') +parser.add_argument("--find", type=str, required=False, default="") +parser.add_argument("--index", action="store_true") +parser.add_argument("--popular", action="store_true") args = parser.parse_args() -def is_valid_char(c): - return c.isalnum() or c == '_' + +def is_valid_character(character): + return character.isalnum() or character == "_" + def process_file(file_path): - word = [] - word_start = -1 - word_end = 0 - word_line = 0 - word_alinia = 0 + current_word = [] + word_start_position = -1 + word_end_position = 0 + word_line_number = 0 + word_paragraph_number = 0 word_length = 0 - new_line_count = 0 - pos = 0 - line = 1 - alinia = 1 - words = {} - with open(file_path, 'r') as f: - while c := f.read(1): + consecutive_newlines = 0 + position = 0 + line_number = 1 + paragraph_number = 1 + word_counts = {} + with open(file_path, "r") as file: + while character := file.read(1): + position += 1 + is_valid = True - pos += 1 - valid = True + if character == ".": + line_number += 1 + is_valid = False - if c == '.': - line += 1 - valid = False + if character == "\n": + consecutive_newlines += 1 + is_valid = False - if c == '\n': - new_line_count += 1 - valid = False + if not is_valid_character(character): + is_valid = False - if not is_valid_char(c): - valid = False + if not is_valid: + if word_start_position > -1: + word_end_position = position - 1 + word_length = word_end_position - word_start_position + word_string = "".join(current_word) + print( + f"{word_string} {word_start_position} {word_end_position} {word_length} {word_line_number} {word_paragraph_number} {paragraph_number}" + ) + if word_string not in word_counts: + word_counts[word_string] = 0 + word_counts[word_string] += 1 - if not valid: - if word_start > -1: - word_end = pos - 1 - word_length = word_end - word_start - word_str = ''.join(word) - print(f"{word_str} {word_start} {word_end} {word_length} {word_line} {word_alinia} {alinia}") - if word_str not in words: - words[word_str] = 0 - words[word_str] += 1 - - word_start = -1 - word = [] + word_start_position = -1 + current_word = [] continue - if new_line_count >= 2: - new_line_count = 0 - alinia += 1 + if consecutive_newlines >= 2: + consecutive_newlines = 0 + paragraph_number += 1 - word.append(c) + current_word.append(character) - if word_start == -1: - word_start = pos - word_line = line - word_alinia = alinia - return words + if word_start_position == -1: + word_start_position = position + word_line_number = line_number + word_paragraph_number = paragraph_number + return word_counts -class WordDb: + +class WordDatabase: def __init__(self, path): self.path = path - self.conn = sqlite3.connect(path) - self.cursor = self.conn.cursor() - self.conn.commit() - self.words = {} + self.connection = sqlite3.connect(path) + self.cursor = self.connection.cursor() + self.connection.commit() def reset(self): - self.words = {} self.cursor.execute("DROP TABLE IF EXISTS words") - self.cursor.execute(""" + self.cursor.execute( + """ CREATE TABLE words ( word TEXT NOT NULL, count INTEGER NOT NULL ) - """) - self.conn.commit() + """ + ) + self.connection.commit() def insert(self, word, count): - if word not in self.words: - self.words[word] = count - self.cursor.execute("INSERT INTO words (word, count) VALUES (?, ?)", (word, count)) - else: - self.words[word] += count - self.cursor.execute("UPDATE words SET count = ? WHERE word = ?", (self.words[word], word)) + self.cursor.execute( + "UPDATE words SET count = count + ? WHERE word = ?", (count, word) + ) + if self.cursor.rowcount == 0: + self.cursor.execute( + "INSERT INTO words (word, count) VALUES (?, ?)", (word, count) + ) def commit(self): - self.conn.commit() - + self.connection.commit() + def total_count(self): self.cursor.execute("SELECT SUM(count) FROM words") - return self.cursor.fetchone()[0] + result = self.cursor.fetchone() + return result[0] if result else 0 def get(self, word): self.cursor.execute("SELECT count FROM words WHERE word = ?", (word,)) - return self.cursor.fetchone()[0] + result = self.cursor.fetchone() + return result[0] if result else 0 def most_popular(self, count): - self.cursor.execute("SELECT word, count FROM words ORDER BY count DESC LIMIT ?", (count,)) + self.cursor.execute( + "SELECT word, count FROM words ORDER BY count DESC LIMIT ?", (count,) + ) return list(self.cursor.fetchall()) def __del__(self): self.commit() - self.conn.close() + self.connection.close() print("Database closed") -db = WordDb("tags.db") + +database = WordDatabase("tags.db") + def index(): - words = {} - for f in pathlib.Path("logs_plain").iterdir(): - for key, value in process_file(f).items(): - db.insert(key, value) - + database.connection.execute("BEGIN") + for file_path in pathlib.Path("logs_plain").iterdir(): + for word, count in process_file(file_path).items(): + database.insert(word, count) + database.connection.commit() + from pprint import pprint as pp - pp(db.most_popular(100)) - db.commit() - print(len(words.keys())) - + + pp(database.most_popular(100)) + print(0) + + if args.find: - print(db.get(args.find)) + print(database.get(args.find)) if args.popular: - for item in db.most_popular(300): + for item in database.most_popular(300): print(item) - print(db.total_count()) + print(database.total_count()) if args.index: - db.reset() - index() \ No newline at end of file + database.reset() + index()