addet qfc not as a submodule

This commit is contained in:
pika 2024-08-18 15:33:19 +02:00
parent cc8e05e3d5
commit bbf0120ad6
13 changed files with 806 additions and 5 deletions

0
.qfc/qfc/__init__.py Normal file
View file

60
.qfc/qfc/ansi.py Normal file
View file

@ -0,0 +1,60 @@
import sys
BOLD = "\x1b[1m"
CLEAR_FORMATTING = "\x1b[0m"
ERASE_SCREEN = "\x1b[J"
ERASE_LINE = "\x1b[2K"
FOREGROUND_BLACK = "\x1b[30m"
BACKGROUND_WHITE = "\x1b[47m"
def _CURSOR_COLUMN(pos):
# ideally, CSI n G escape code is used to set the absolute horizental position
# Sadly, it's not an Ansi.sys escape code (not supported in all terminals)
# This shim try to simulate it by moving cursor backwards 1000 characters(terminal row width is assumed to be less than that number, which may not be the case for aliens laptops :))
# Then, move cursor pos - 1 characthers forward (the - 1 is because the cursor is at position 1)
c = "\x1b[1000D"
if pos:
c += "\x1b["+str(pos - 1)+"C"
return c
def _CURSOR_PREVIOUS_LINES(number):
return "\x1b["+str(number)+"A"
def _CURSOR_NEXT_LINES(number):
return "\x1b["+str(number)+"B"
def select_text(text):
return (FOREGROUND_BLACK +
BACKGROUND_WHITE +
text.replace(
CLEAR_FORMATTING,
CLEAR_FORMATTING + FOREGROUND_BLACK + BACKGROUND_WHITE)+
CLEAR_FORMATTING)
def bold_text(text):
return (BOLD +
text.replace(
CLEAR_FORMATTING,
CLEAR_FORMATTING + BOLD)+
CLEAR_FORMATTING)
def move_cursor_line_beggining():
sys.stdout.write(_CURSOR_COLUMN(0))
def move_cursor_horizental(n):
sys.stdout.write(_CURSOR_COLUMN(n))
def move_cursor_previous_lines(number_of_lines):
sys.stdout.write(_CURSOR_PREVIOUS_LINES(number_of_lines))
def move_cursor_next_lines(number_of_lines):
sys.stdout.write(_CURSOR_NEXT_LINES(number_of_lines))
def erase_from_cursor_to_end():
sys.stdout.write(ERASE_SCREEN)
def erase_line():
sys.stdout.write(ERASE_LINE)
def flush():
sys.stdout.flush()

231
.qfc/qfc/core.py Normal file
View file

@ -0,0 +1,231 @@
import os
import re
from . import keys
from . import ui
from . import readchar
from . import dirhandler
def get_selected_command_or_input(search):
state = State(search)
# draw the screen (prompt + matched strings)
ui.refresh(state)
# wait for user input
prompt(state)
# clear the screen
ui.erase()
# state.input holds the path selected by the user
return state.input
def prompt(state):
while True:
c = readchar.get_symbol()
if c == keys.ENTER:
if state.get_matches():
state.append_match_to_input()
break
elif c == keys.CTRL_F:
break
elif c == keys.TAB:
if state.get_matches():
state.append_match_to_input()
elif c == keys.CTRL_C or c == keys.ESC:
state.reset_input()
break
elif c == keys.CTRL_U:
state.clear_input()
elif c == keys.BACKSPACE:
state.set_input(state.input[0:-1])
elif c == keys.UP or c == keys.CTRL_K:
state.select_previous()
elif c == keys.DOWN or c == keys.CTRL_J:
state.select_next()
elif c == keys.LEFT or c == keys.CTRL_H:
state.go_back()
elif c == keys.RIGHT or c == keys.CTRL_L:
if state.get_matches():
state.append_match_to_input()
elif c < 256 and c >= 32:
state.set_input(state.input + chr(c))
ui.refresh(state)
class State(object):
''' The Current User state, including user written characters, matched commands, and selected one '''
def __init__(self, default_input):
self._selected_command_index = 0
self.matches = []
self.default_input = default_input
self.set_input(default_input)
def get_matches(self):
return self.matches
def reset_input(self):
self.input = self.default_input
def set_input(self, input):
self.input = input if input else ""
self._update()
def append_match_to_input(self):
self.set_input(os.path.join(os.path.dirname(self.input), self.get_selected_match()))
def go_back(self):
isdir = is_dir(self.input)
input_stripped = self.input.rstrip(os.sep)
if not input_stripped:
return
input_splitted = input_stripped.split(os.sep)
entry_name = input_splitted[-1]
if isdir:
entry_name += os.sep
new_input = os.sep.join(input_splitted[0:-1])
if new_input:
new_input += os.sep
self.set_input(new_input)
self.set_selected_entry(entry_name)
def clear_input(self):
self.set_input("")
def clear_selection(self):
self._selected_command_index = 0
def select_next(self):
self._selected_command_index = (self._selected_command_index + 1) % len(self.matches) if len(self.matches) else 0
def select_previous(self):
self._selected_command_index = (self._selected_command_index - 1) % len(self.matches) if len(self.matches) else 0
def _update(self):
self.matches = get_matches(os.getcwd(),self.input)
self._selected_command_index = 0
def get_output(self):
return self.input
def get_selected_match(self):
if len(self.matches):
return self.matches[self._selected_command_index]
else:
raise Exception('No matches found')
def set_selected_entry(self, entry):
if not (entry in self.matches):
return
self._selected_command_index = self.matches.index(entry)
def get_matches(root_dir, user_input):
start_dir = join_paths(root_dir, os.path.dirname(user_input))
search_str = os.path.basename(user_input)
if not os.path.isdir(start_dir):
return []
source_files = dirhandler.get_source_files(start_dir)
filtered_files = filter_files(source_files, search_str)
sorted_files = sort_matches(filtered_files, search_str)
return sorted_files
def filter_files(files, search_str):
""" Filter a list of files based on a search string
:param files: list of files to filter (ie ['/a','/a/b', '/a/b/c', '/b']), the order doesn't matter. 'a' and 'a/' are considered different.
:param search_str, the filtering string (ie 'a')
This function return only first level files/dirs that match the given search string. That is, filtering ['/a','/a/b','/a/b/c'] with the string 'a' returns only ['/a'],
This is to avoid polluting the screen with all files inside a directory when a user look for that directory
"""
matched = set()
if not search_str:
for f in files:
# right strip to the first seperator(ie '/')
f = f[:_index_or_len(f, os.sep)+1]
matched.add(f)
else:
search_str = search_str.lower()
for f in files:
if search_str in f.lower():
index = f.lower().index(search_str)
trail = f[index:]
f = f[:index + _index_or_len(trail, os.sep)+1]
matched.add(f)
return matched
def sort_matches(matches, string):
""" Sort paths according to a string """
files = sorted(matches, key=lambda s: s.lower())
return sorted(files, key=lambda p:get_weight(p, string))
def get_weight(path, string):
""" calculate how much a path matches a given string on a scale of 0->10000, a lower number means a better match.
The string should be present in the path, or this function will fail
The weight is calculated using the following formula:
0 00 0
| | |
Number of elements in the path <----------------| | |--------------------------> is a directory(0) or not(1)
(ie 2 for 'aa/bb' and 3 for 'aa/bb/cc') |
|
v
There is a word in the matched path element that exactly match the given string? ('aa_bb' matches user input 'aa' but doesn't match 'a')
|
No <--------------------|-----------------------------> yes
| |
| |
| |
v v
path element starts with the user input? There is only one word in the matched path element ('aa' but not 'aa_bb'),
| which means the path elements exactly match the string
| |
No <-------|-----Yes(2) No -----|----- Yes (0)
| |
v v
index of the string in the path element + 10 (index of the matched word within path element words + 1)
Maximums are added to make sure things doesn't overlap
"""
weight = 0
if not is_dir(path):
weight += 1
string = string.lower()
p = path.rstrip(os.sep).lower()
path_elems = p.split(os.sep)
weight += min(len(path_elems) * 1000, 10000) # Max 10 elements in path(set it to 10 for longer paths)
if not string:
return weight
elm = next(e for e in path_elems if string in e)
elm_words = [_f for _f in re.split('[\W_]+', elm) if _f]
if string in elm_words:
if len(elm_words) > 1:
weight += 10 * (min(elm_words.index(string),8) + 1) # Max 8 words per path entry
else:
if elm.startswith(string):
weight += 10 * 2
else:
weight += 10 * (10 + min(elm.index(string), 89)) # Max path element with 89 characters
return weight
# Helper functions:
def join_paths(p1, p2):
p1 = (os.path.expandvars(os.path.expanduser(p1)))
p2 = (os.path.expandvars(os.path.expanduser(p2)))
return os.path.normpath(os.path.join(p1, p2))
def _index_or_len(s, c):
if c in s:
return s.index(c)
else:
return len(s)
def is_dir(p):
if p.endswith(os.sep):
return True
return False

114
.qfc/qfc/dirhandler.py Normal file
View file

@ -0,0 +1,114 @@
import os
import subprocess
import sys
class CVSHandler():
""" Handler of CVS (fir, mercurial...) directories,
The main purpose of this class is to cache external cvs command output, and determine the appropriate files to yield when navigating to a subdirectory of a project.
This basically means that the external command is run once (ie git ls-files), cached, and when calling get_source_files on a subdirectory of the project root (ie project-root/subdir),
filtering from all project files of is done here.
"""
def __init__(self, cvs):
self._roots_cache = {}
self._not_tracked_cache = set()
self.cvs = cvs
def _get_root_from_cache(self, directory):
""" a directory is considered cached if it's the project root or a subdirectory of that project root.
returns the project root dir, or None if the directory is not cached.
"""
if directory in self._roots_cache:
return directory
if os.path.dirname(directory) == directory:
return None
return self._get_root_from_cache(os.path.dirname(directory))
def get_source_files(self, directory):
if directory in self._not_tracked_cache:
return None
root_dir = self._get_root_from_cache(directory)
if not root_dir:
try:
# check if it's a tracked cvs dir, if yes, get the project root and the source files
root_dir = self.cvs._get_root(directory)
self._roots_cache[root_dir] = self.cvs._get_tracked_files(root_dir)
except Exception as e:
# not a cvs tracked dir, save it to not issue that command again
self._not_tracked_cache.add(directory)
return None
files = self._roots_cache[root_dir]
# the passed directory argument is a subdirectory of the project root
if directory != root_dir:
rel_dir = os.path.relpath(directory, root_dir)
files = [f[len(rel_dir)+1:] for f in files if f.startswith(rel_dir)]
return files
class Git():
@staticmethod
def _get_root(directory):
return run_command("cd %s && git rev-parse --show-toplevel" % directory).strip()
@staticmethod
def _get_tracked_files(directory):
return run_command("cd %s && git ls-files && git ls-files --others --exclude-standard" % directory).strip().split('\n')
class Mercurial():
@staticmethod
def _get_root(directory):
return run_command("cd %s && hg root" % directory).strip()
@staticmethod
def _get_tracked_files(directory):
return run_command("cd %s && (hg status -marcu | cut -d' ' -f2)" % directory).strip().split('\n')
class DefaultDirHandler():
""" The default directory handler uses the 'find' external program to return all the files inside a given directory up to MAX_depth depth (ie, if maxdepth=2, returns all files inside that dir, and all files in a subdir of that directory)"""
def __init__(self):
self._cache = {}
self.MAX_DEPTH = 3
def _walk_down(self, start_dir):
try:
out = run_command("find %s -maxdepth %s -type f -not -path '*/\.*'" % (start_dir, self.MAX_DEPTH))
except subprocess.CalledProcessError as e:
# Find returns a non 0 exit status if listing a directory fails (for example, permission denied), but still output all files in other dirs
# ignore those failed directories.
out = e.output
if sys.version_info >= (3, 0):
out = out.decode('utf-8')
if not out:
return []
files = out.split('\n')
return [os.path.relpath(f, start_dir) for f in files if f]
def get_source_files(self, start_dir):
if not start_dir in self._cache:
self._cache[start_dir] = self._walk_down(start_dir)
return self._cache[start_dir]
def run_command(string):
''' fork a process to execute the command string given as argument, returning the string written to STDOUT '''
DEVNULL = open(os.devnull, 'wb')
out = subprocess.check_output(string, stderr=DEVNULL, shell=True)
if sys.version_info >= (3, 0):
return out.decode('utf-8')
return out
git = CVSHandler(Git)
hg = CVSHandler(Mercurial)
default = DefaultDirHandler()
def get_source_files(directory):
""" check first if the given directory is inside a git tracked project, if no, check with mercurial, if no, fallback to the default handler """
files = git.get_source_files(directory)
# if the returned files list is empty, it's considered not a tracked directory
if files:
return files
files = hg.get_source_files(directory)
if files:
return files
return default.get_source_files(directory)

19
.qfc/qfc/keys.py Normal file
View file

@ -0,0 +1,19 @@
CTRL_C = 3 # Ctrl-c
CTRL_H = 8 # ctrl-h
CTRL_J = 10 # ctrl-h
CTRL_K = 11 # ctrl-h
CTRL_L = 12 # ctrl-h
CTRL_F = 6 # ctrl-f
ENTER = 13 # Enter
CTRL_U = 21 # Ctrl+u
ESC = 27 # Escape
BACKSPACE = 127 # Backspace
TAB = 9 # Tab
RIGHT = -1 # FAKE CODE to abstract away the fact that a multibyte string is needed to represent arrow keys
DOWN = -2 # same
UP = -3 # same
LEFT = -4 # same
SHIFTTAB = -5 # same
SHIFTENTER = -6
SPACE = 32
ANTISLASH = 47

65
.qfc/qfc/readchar.py Normal file
View file

@ -0,0 +1,65 @@
import sys
import tty
import termios
import fcntl
import os
from . import keys
def get_symbol():
''' Read a symbol, which can be a single byte character or a multibyte string'''
ch = read_char()
ch_code = ord(ch)
# check for multibyte string
if ch_code == keys.ESC:
ch = read_char_no_blocking()
if ch == '':
# ESC key pressed
return keys.ESC
elif ch != 'O' and ch != '[':
return ord(ch)
else:
ch = read_char_no_blocking()
if ch == 'A':
return keys.UP
elif ch == 'B':
return keys.DOWN
elif ch == 'C':
return keys.RIGHT
elif ch == 'D':
return keys.LEFT
elif ch == 'Z':
return keys.SHIFTTAB
return ch_code
def read_char():
''' Read a character '''
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(fd, termios.TCSADRAIN)
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
def read_char_no_blocking():
''' Read a character in nonblocking mode, if no characters are present in the buffer, return an empty string '''
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
old_flags = fcntl.fcntl(fd, fcntl.F_GETFL)
try:
tty.setraw(fd, termios.TCSADRAIN)
fcntl.fcntl(fd, fcntl.F_SETFL, old_flags | os.O_NONBLOCK)
return sys.stdin.read(1)
except IOError as e:
ErrorNumber = e[0]
# IOError with ErrorNumber 11(35 in Mac) is thrown when there is nothing to read(Resource temporarily unavailable)
if (sys.platform.startswith("linux") and ErrorNumber != 11) or (sys.platform == "darwin" and ErrorNumber != 35):
raise
return ""
finally:
fcntl.fcntl(fd, fcntl.F_SETFL, old_flags)
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)

60
.qfc/qfc/ui.py Normal file
View file

@ -0,0 +1,60 @@
from . import ansi
import os
import math
def _get_terminal_columns():
''' get the number of terminal columns, used to determine spanned lines of a mark(reqansired for cursor placement) '''
_, columns = os.popen('stty size', 'r').read().split()
return int(columns)
def erase():
''' the commandline cursor is always at the first line (user prompt)
Therefore, erasing the current and following lines clear all output
'''
ansi.move_cursor_line_beggining()
ansi.erase_from_cursor_to_end()
def refresh(state):
''' Redraw the output, this function will be triggered on every user interaction(key pressed)'''
erase()
lines, num_rows = _construct_output(state)
for line in lines:
print(line)
# go up
ansi.move_cursor_previous_lines(num_rows)
# palce the cursor at the end of first line)
ansi.move_cursor_horizental(len(lines[0])+1)
ansi.flush()
def _construct_output(state):
columns = _get_terminal_columns()
def number_of_rows(line):
return int(math.ceil(float(len(line))/columns))
displayed_lines = []
# Number of terminal rows spanned by the output, used to determine how many lines we need to go up(to place the cursor after the prompt) after displaying the output
num_rows = 0
prompt_line = 'Path: ' + state.input
displayed_lines.append(prompt_line)
num_rows += number_of_rows(prompt_line)
matches = state.get_matches()
if matches:
# display commands from Max(0,selected_command_index - 10 +1 ) to Max(10,SelectedCommandIndex + 1)
selected_command_index = matches.index(state.get_selected_match())
matches_to_display = matches[max(0, selected_command_index - 10 + 1):max(10, selected_command_index + 1)]
for index, m in enumerate(matches_to_display):
fm = ' ' + m
num_rows += number_of_rows(fm)
# Formatting text(make searched word bold)
for w in state.input.split(' '):
if w:
fm = fm.replace(w, ansi.bold_text(w))
# highlighting selected command
if m == state.get_selected_match():
fm = ansi.select_text(fm)
displayed_lines.append(fm)
else:
not_found_line = 'Nothing found'
displayed_lines.append(not_found_line)
num_rows += number_of_rows(not_found_line)
return displayed_lines, num_rows