# I saved this script as gist because I wrote it a lot of times. # It has support for remembering line numbers and so on what is not used. # It was originally written in C by me and ported to Python. # The original application did use these features. # Written by retoor@molodetz.nl # This script processes text files to identify words, their positions, and their frequency of occurrence. It uses command-line arguments to query or display popular words and stores results in a SQLite database. # Imports: # - argparse: For handling command-line arguments # - sqlite3: A library to control and manage SQLite databases # - pathlib: To work with filesystem paths in an object-oriented way # MIT License: # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import argparse import sqlite3 import pathlib parser = argparse.ArgumentParser() parser.add_argument("--find", type=str, required=False, default="") parser.add_argument("--index", action="store_true") parser.add_argument("--popular", action="store_true") args = parser.parse_args() def is_valid_character(character): return character.isalnum() or character == "_" def process_file(file_path): current_word = [] word_start_position = -1 word_end_position = 0 word_line_number = 0 word_paragraph_number = 0 word_length = 0 consecutive_newlines = 0 position = 0 line_number = 1 paragraph_number = 1 word_counts = {} with open(file_path, "r") as file: while character := file.read(1): position += 1 is_valid = True if character == ".": line_number += 1 is_valid = False if character == "\n": consecutive_newlines += 1 is_valid = False if not is_valid_character(character): is_valid = False if not is_valid: if word_start_position > -1: word_end_position = position - 1 word_length = word_end_position - word_start_position word_string = "".join(current_word) print( f"{word_string} {word_start_position} {word_end_position} {word_length} {word_line_number} {word_paragraph_number} {paragraph_number}" ) if word_string not in word_counts: word_counts[word_string] = 0 word_counts[word_string] += 1 word_start_position = -1 current_word = [] continue if consecutive_newlines >= 2: consecutive_newlines = 0 paragraph_number += 1 current_word.append(character) if word_start_position == -1: word_start_position = position word_line_number = line_number word_paragraph_number = paragraph_number return word_counts class WordDatabase: def __init__(self, path): self.path = path self.connection = sqlite3.connect(path) self.cursor = self.connection.cursor() self.connection.commit() def reset(self): self.cursor.execute("DROP TABLE IF EXISTS words") self.cursor.execute( """ CREATE TABLE words ( word TEXT NOT NULL, count INTEGER NOT NULL ) """ ) self.connection.commit() def insert(self, word, count): self.cursor.execute( "UPDATE words SET count = count + ? WHERE word = ?", (count, word) ) if self.cursor.rowcount == 0: self.cursor.execute( "INSERT INTO words (word, count) VALUES (?, ?)", (word, count) ) def commit(self): self.connection.commit() def total_count(self): self.cursor.execute("SELECT SUM(count) FROM words") result = self.cursor.fetchone() return result[0] if result else 0 def get(self, word): self.cursor.execute("SELECT count FROM words WHERE word = ?", (word,)) result = self.cursor.fetchone() return result[0] if result else 0 def most_popular(self, count): self.cursor.execute( "SELECT word, count FROM words ORDER BY count DESC LIMIT ?", (count,) ) return list(self.cursor.fetchall()) def __del__(self): self.commit() self.connection.close() print("Database closed") database = WordDatabase("tags.db") def index(): database.connection.execute("BEGIN") for file_path in pathlib.Path("logs_plain").iterdir(): for word, count in process_file(file_path).items(): database.insert(word, count) database.connection.commit() from pprint import pprint as pp pp(database.most_popular(100)) print(0) if args.find: print(database.get(args.find)) if args.popular: for item in database.most_popular(300): print(item) print(database.total_count()) if args.index: database.reset() index()