This commit is contained in:
pika 2025-04-14 09:22:15 +02:00
commit cc8ffcfcc2
38 changed files with 6046 additions and 0 deletions

331
python/Caddy/addEntry.py Executable file
View file

@ -0,0 +1,331 @@
#!/usr/bin/env python3
import os
import re
import socket
def get_host_ip():
"""Get the host IP address"""
try:
# Create a socket to determine the outgoing IP address
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80)) # Connect to Google DNS (doesn't send data)
host_ip = s.getsockname()[0]
s.close()
return host_ip
except Exception:
return "127.0.0.1" # Fallback to localhost
def get_user_input(prompt, default=None):
"""Interactive prompt with default values"""
value = input(f"{prompt} [{default}]: ") or default
return value
def find_caddyfile():
"""Check if the Caddyfile exists, otherwise ask for the path"""
default_path = "./conf/Caddyfile"
if os.path.exists(default_path):
return default_path
print("⚠ No Caddyfile found!")
while True:
custom_path = get_user_input("Enter the path to your Caddyfile")
if os.path.exists(custom_path):
return custom_path
print("❌ File not found, please try again.")
def parse_existing_entries(caddyfile_path):
"""Parse the existing Caddyfile to extract all configured domains"""
existing_entries = {}
try:
with open(caddyfile_path, "r") as file:
content = file.read()
# First, normalize the content to make parsing more reliable
# This removes comments and normalizes whitespace
lines = []
in_comment = False
for line in content.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
# Handle inline comments
if '#' in line and not in_comment:
line = line[:line.index('#')].strip()
lines.append(line)
normalized_content = '\n'.join(lines)
# Use brace matching to properly extract domain blocks
blocks = []
current_position = 0
while current_position < len(normalized_content):
# Find the next domain block start
block_start = normalized_content.find('{', current_position)
if block_start == -1:
break
# Find corresponding domain definition
domain_start = normalized_content.rfind('\n', 0, block_start)
if domain_start == -1:
domain_start = 0
else:
domain_start += 1 # Skip the newline
domain_def = normalized_content[domain_start:block_start].strip()
# Find end of this block (accounting for nested braces)
brace_count = 1
block_end = block_start + 1
while brace_count > 0 and block_end < len(normalized_content):
if normalized_content[block_end] == '{':
brace_count += 1
elif normalized_content[block_end] == '}':
brace_count -= 1
block_end += 1
if brace_count == 0:
# We found a complete block
block_content = normalized_content[domain_start:block_end]
# Only process blocks with reverse_proxy directives
if 'reverse_proxy' in block_content:
blocks.append((domain_def, block_content))
current_position = block_end
# Process the extracted blocks
for domain_def, block_content in blocks:
# Extract target from reverse_proxy directive
proxy_match = re.search(r'reverse_proxy\s+(https?:\/\/[\d\.]+:\d+|[\d\.]+:\d+)', block_content)
if not proxy_match:
continue
target = proxy_match.group(1).strip()
# Process domains (handle comma-separated lists correctly)
domains = [d.strip() for d in domain_def.split(',')]
# Process each domain
for domain in domains:
# Skip if it looks like a directive rather than a domain
if '{' in domain or '}' in domain or not domain:
continue
# Skip literal "Host" that are likely from host header directives rather than domains
if domain == "Host" or domain == "{host}":
continue
# Verify domain format (basic check)
if not re.match(r'^[a-zA-Z0-9][-a-zA-Z0-9.]*[a-zA-Z0-9]$', domain) and not domain.startswith('*.'):
print(f"⚠️ Skipping invalid domain format: '{domain}'")
continue
# Determine proxy type
proxy_type = 'standard'
if "https://" in target and "tls_insecure_skip_verify" in block_content:
if "versions h1.1" in block_content:
proxy_type = 'opnsense'
else:
proxy_type = 'https_skip_verify'
# Store the entry
existing_entries[domain] = {
'target': target,
'content': block_content,
'proxy_type': proxy_type
}
# Debug output for special cases
if domain.lower() == "host":
print(f"⚠️ Warning: Found domain named 'host': {domain}")
except Exception as e:
print(f"❌ Error reading Caddyfile: {e}")
import traceback
print(traceback.format_exc())
return existing_entries
def format_caddy_entry(domains, target_ip, target_port, proxy_type):
"""Generate a properly formatted Caddy entry based on proxy type"""
domain_list = ", ".join(domains) # Multiple domains in a single line
if proxy_type == "standard":
return f"""
{domain_list} {{
tls {{
dns cloudflare {{env.CLOUDFLARE_API_TOKEN}}
}}
reverse_proxy {target_ip}:{target_port}
}}
"""
elif proxy_type == "https_skip_verify":
return f"""
{domain_list} {{
tls {{
dns cloudflare {{env.CLOUDFLARE_API_TOKEN}}
}}
reverse_proxy https://{target_ip}:{target_port} {{
transport http {{
tls
tls_insecure_skip_verify
}}
}}
}}
"""
elif proxy_type == "opnsense":
return f"""
{domain_list} {{
tls {{
dns cloudflare {{env.CLOUDFLARE_API_TOKEN}}
}}
reverse_proxy https://{target_ip}:{target_port} {{
transport http {{
tls
tls_insecure_skip_verify
versions h1.1 # Enforce HTTP/1.1
}}
header_up Host {{host}}
header_up X-Real-IP {{remote_host}}
header_up X-Forwarded-Proto {{scheme}}
header_up X-Forwarded-For {{remote}}
header_down Strict-Transport-Security "max-age=31536000; includeSubDomains; preload"
# Remove problematic headers
header_up -Connection
header_up -Upgrade
}}
}}
"""
def update_existing_entry(caddyfile_path, domain, new_entry):
"""Replace an existing entry for the given domain"""
try:
with open(caddyfile_path, "r") as file:
content = file.read()
# New improved pattern to correctly match complete domain blocks
# This regex matches the domain block from start to finish, including all braces
domain_pattern = fr'(?m)^(?:(?:{re.escape(domain)}|[^{{,\s]+(?:,\s*{re.escape(domain)})(?:,\s*[^{{,\s]+)*|{re.escape(domain)}(?:,\s*[^{{,\s]+)+))\s*{{(?:[^{{}}]|{{(?:[^{{}}]|{{[^{{}}]*}})*}})*}}'
pattern = re.compile(domain_pattern, re.DOTALL)
match = pattern.search(content)
if match:
# Replace the block containing this domain with the new entry
new_content = content[:match.start()] + new_entry.strip() + content[match.end():]
with open(caddyfile_path, "w") as file:
file.write(new_content)
print(f"✅ Updated entry for {domain}")
else:
print(f"⚠ Could not find exact entry for {domain}. Adding as new entry.")
with open(caddyfile_path, "a") as file:
file.write(new_entry)
except Exception as e:
print(f"❌ Error updating Caddyfile: {e}")
print(f"Error details: {str(e)}")
def add_caddy_entry(caddyfile_path):
"""Add new Caddy reverse proxy entries, showing existing entries first"""
host_ip = get_host_ip()
existing_entries = parse_existing_entries(caddyfile_path)
print("\n📌 Existing Caddy Entries:")
if existing_entries:
for domain, data in existing_entries.items():
print(f" 🔹 {domain}{data['target']}")
else:
print(" ⚠ No entries found.")
while True:
domain = get_user_input("\nEnter the domain you want to configure", "")
if not domain:
print("❌ No domain provided. Skipping entry.")
continue
# If domain exists, extract its current values
existing_ip = host_ip
existing_port = "8080"
proxy_type = "standard"
if domain in existing_entries:
print(f"⚠ The domain {domain} already exists.")
edit_existing = get_user_input("Do you want to edit this entry? (y/n)", "y").lower() == "y"
if not edit_existing:
continue
existing_target = existing_entries[domain]['target']
proxy_type = existing_entries[domain]['proxy_type']
target_without_protocol = existing_target.replace("https://", "").replace("http://", "")
if ":" in target_without_protocol:
existing_ip, existing_port = target_without_protocol.split(":")
else:
existing_ip = target_without_protocol
existing_port = "80"
# Show host IP as an option
target_ip_prompt = f"Enter the target IP (type 'host' for {host_ip})"
target_ip = get_user_input(target_ip_prompt, existing_ip)
# Replace 'host' with actual host IP
if target_ip.lower() == 'host':
target_ip = host_ip
target_port = get_user_input("Enter the target port", existing_port)
print("\nChoose the proxy mode:")
print("1⃣ Standard (No HTTPS changes)")
print("2⃣ Internal HTTPS (skip verify)")
print("3⃣ OPNsense Mode (skip verify + enforce HTTP/1.1)")
# Pre-fill proxy type based on detected configuration
mode_choice_default = "1"
if proxy_type == "https_skip_verify":
mode_choice_default = "2"
elif proxy_type == "opnsense":
mode_choice_default = "3"
mode_choice = get_user_input("Enter option (1/2/3)", mode_choice_default)
proxy_type = "standard"
if mode_choice == "2":
proxy_type = "https_skip_verify"
elif mode_choice == "3":
proxy_type = "opnsense"
new_entry = format_caddy_entry([domain], target_ip, target_port, proxy_type)
if domain in existing_entries:
update_existing_entry(caddyfile_path, domain, new_entry)
else:
try:
with open(caddyfile_path, "a") as file:
file.write(new_entry)
print(f"\n✅ New entry added: {domain}{target_ip}:{target_port}")
except Exception as e:
print(f"\n❌ Error writing to Caddyfile: {e}")
return
# Ask if another entry should be added
more_entries = get_user_input("\nDo you want to add or edit another entry? (y/n)", "n").lower() == "y"
if not more_entries:
break
# Restart Caddy container
restart_caddy = get_user_input("\nDo you want to restart the Caddy container? (y/n)", "y").lower() == "y"
if restart_caddy:
os.system("docker compose restart caddy")
print("🔄 Caddy container restarted!")
if __name__ == "__main__":
caddyfile_path = find_caddyfile()
add_caddy_entry(caddyfile_path)

2
python/README.md Normal file
View file

@ -0,0 +1,2 @@
# python

354
python/checkpkg.py Executable file
View file

@ -0,0 +1,354 @@
#!/usr/bin/env python3
import argparse
import urllib.request
import json
import signal
import re
import shutil
import textwrap
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.error import HTTPError, URLError
# Color codes
COLORS = {
'reset': '\033[0m',
'bold': '\033[1m',
'red': '\033[91m',
'green': '\033[92m',
'yellow': '\033[93m',
'header': '\033[94m'
}
def colorize(text, color):
"""Add ANSI color codes if output is a terminal"""
if not hasattr(colorize, 'is_tty'):
colorize.is_tty = __import__('sys').stdout.isatty()
return f"{COLORS[color]}{text}{COLORS['reset']}" if colorize.is_tty else text
# def get_terminal_width(default=80):
# """Get terminal width with fallback"""
# try:
# return shutil.get_terminal_size().columns
# except:
# return default
signal.signal(signal.SIGINT, lambda s, f: exit(1))
REQUEST_HEADERS = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) checkpkg/1.0',
'Accept': 'application/json'
}
PM_MAPPINGS = {
'apt': ['debian_', 'ubuntu_'],
'dnf': ['fedora_'],
'pacman': ['arch'],
'aur': ['aur'],
'apk': ['alpine_'],
'zypper': ['opensuse_']
}
REPO_FORMATS = {
'debian_': "Debian {}",
'ubuntu_': "Ubuntu {}",
'fedora_': "Fedora {}",
'arch': "Arch",
'aur': "AUR",
'alpine_': "Alpine {}",
'opensuse_': "openSUSE {}"
}
def version_key(version):
"""Create a sorting key for version comparison"""
return [
(0, int(part)) if part.isdigit() else (1, part.lower())
for part in re.findall(r'(\d+|\D+)', version)
]
def get_package_manager(repo):
for pm, patterns in PM_MAPPINGS.items():
if any(repo.startswith(p) for p in patterns):
return pm
return None
def format_repository(repo):
for pattern, fmt in REPO_FORMATS.items():
if repo.startswith(pattern):
parts = repo.split('_')
return fmt.format(parts[1] if len(parts) > 1 else '')
return repo
def fetch_package_data(package):
try:
req = urllib.request.Request(
f'https://repology.org/api/v1/project/{package}',
headers=REQUEST_HEADERS
)
with urllib.request.urlopen(req, timeout=10) as response:
return json.load(response)
except HTTPError as e:
if e.code == 403:
print(colorize(f"Error: Repology blocked the request for {package} (try again later)", 'red'))
return None
except Exception:
return None
def print_table(output, headers, selected_pms, args):
"""Print formatted table with consistent alignment or list for small terminals"""
# Terminal dimensions
terminal_width = shutil.get_terminal_size().columns
# Calculate package availability for coloring
pkg_availability = {}
for pkg in args.packages:
versions = output.get(pkg, {})
available_count = sum(1 for pm in selected_pms if versions.get(pm, 'Not found') != 'Not found')
if available_count == 0:
pkg_availability[pkg] = 'red' # Not available anywhere
elif available_count == len(selected_pms):
pkg_availability[pkg] = 'green' # Available everywhere
else:
pkg_availability[pkg] = 'yellow' # Available in some places
# Determine minimum required widths
min_pkg_width = max(len(pkg) for pkg in args.packages) + 2
min_pm_width = 10
min_required_width = min_pkg_width + (min_pm_width * len(selected_pms)) + (3 * len(selected_pms))
# If terminal is too narrow for the table, use list format instead
if terminal_width < min_required_width and len(selected_pms) > 1:
print_list_format(output, headers, selected_pms, args, pkg_availability)
return
# Calculate column widths
padding = 1 # Space on each side of content
# Package column width
pkg_col_width = min(min_pkg_width + 4, max(min_pkg_width, terminal_width // (len(selected_pms) + 3)))
# PM column widths (divide remaining space equally)
remaining_width = terminal_width - pkg_col_width - (3 * len(selected_pms))
pm_col_width = max(min_pm_width, remaining_width // len(selected_pms))
col_widths = [pkg_col_width] + [pm_col_width] * len(selected_pms)
# Print header row
header_cells = []
for i, header in enumerate(headers):
text = header.center(col_widths[i] - (2 * padding))
cell = " " * padding + colorize(text, 'header') + " " * padding
header_cells.append(cell)
print(" | ".join(header_cells))
# Print separator line
total_width = sum(col_widths) + (3 * (len(col_widths) - 1))
print("-" * total_width)
# Print each package row
for pkg_idx, pkg in enumerate(args.packages):
versions = output.get(pkg, {})
# First collect all data for this package
package_data = []
# Package name (first column)
package_data.append([colorize(pkg, pkg_availability[pkg])])
# Version data for each package manager
for pm in selected_pms:
version = versions.get(pm, 'Not found')
if version == 'Not found':
package_data.append([colorize('-', 'red')])
continue
# Extract version number and repositories
version_parts = []
match = re.match(r'(.*?)\s+\((.*)\)$', version)
if match:
ver_num, repos = match.groups()
version_parts.append(colorize(ver_num, 'green'))
# Format repositories
repo_lines = []
repo_text = "(" + repos + ")"
# Wrap repository text if needed
avail_width = col_widths[len(package_data)] - (2 * padding)
if len(repo_text) <= avail_width:
repo_lines.append(colorize(repo_text, 'green'))
else:
# Handle wrapping for repositories
repo_parts = repos.split(', ')
current_line = "("
for repo in repo_parts:
if len(current_line) + len(repo) + 2 <= avail_width:
if current_line != "(":
current_line += ", "
current_line += repo
else:
if current_line != "(":
current_line += ")"
repo_lines.append(colorize(current_line, 'green'))
current_line = " " + repo
if current_line != "(":
current_line += ")" if not current_line.startswith(" ") else ""
repo_lines.append(colorize(current_line, 'green'))
# Combined version and repo lines
package_data.append([version_parts[0]] + repo_lines)
else:
# Simple version string
package_data.append([colorize(version, 'green')])
# Determine max number of lines needed
max_lines = max(len(column) for column in package_data)
# Print all lines for this package
for line_idx in range(max_lines):
row_cells = []
for col_idx, col_data in enumerate(package_data):
if line_idx < len(col_data):
# Actual content
content = col_data[line_idx]
content_plain = re.sub(r'\033\[\d+m', '', content)
# Calculate padding
left_pad = padding
right_pad = max(0, col_widths[col_idx] - len(content_plain) - left_pad)
cell = " " * left_pad + content + " " * right_pad
else:
# Empty cell
cell = " " * col_widths[col_idx]
row_cells.append(cell)
print(" | ".join(row_cells))
# Add separator between packages
if pkg_idx < len(args.packages) - 1:
print("·" * total_width)
def print_list_format(output, headers, selected_pms, args, pkg_availability):
"""Print packages in a vertical list format for narrow terminals"""
terminal_width = shutil.get_terminal_size().columns
for pkg_idx, pkg in enumerate(args.packages):
pkg_color = pkg_availability[pkg]
versions = output.get(pkg, {})
# Print package header with color based on availability
print(f"\n{colorize('Package:', 'bold')} {colorize(pkg, pkg_color)}")
print("-" * min(40, terminal_width - 2))
# Print versions for each package manager
for pm in selected_pms:
pm_name = headers[selected_pms.index(pm) + 1] # Get friendly display name
version = versions.get(pm, 'Not found')
if version == 'Not found':
print(f"{colorize(pm_name, 'header')}: {colorize('-', 'red')}")
else:
# Extract version and repo information
match = re.match(r'(.*?)\s+\((.*)\)$', version)
if match:
ver_num, repos = match.groups()
# Handle long repository lists with wrapping
if len(pm_name) + len(ver_num) + len(repos) + 5 > terminal_width:
print(f"{colorize(pm_name, 'header')}: {colorize(ver_num, 'green')}")
# Wrap repositories with proper indentation
wrapper = textwrap.TextWrapper(
width=terminal_width - 4,
initial_indent=" ",
subsequent_indent=" "
)
wrapped = wrapper.fill(f"({repos})")
print(colorize(wrapped, 'green'))
else:
print(f"{colorize(pm_name, 'header')}: {colorize(ver_num, 'green')} ({repos})")
else:
print(f"{colorize(pm_name, 'header')}: {colorize(version, 'green')}")
# Add separator between packages
if pkg_idx < len(args.packages) - 1:
print("\n" + "-" * min(40, terminal_width - 2))
def main():
parser = argparse.ArgumentParser(description='Package search tool')
parser.add_argument('--all', action='store_true')
parser.add_argument('--apt', action='store_true')
parser.add_argument('--dnf', action='store_true')
parser.add_argument('--pacman', action='store_true')
parser.add_argument('--apk', action='store_true')
parser.add_argument('--zypper', action='store_true')
parser.add_argument('--aur', action='store_true')
parser.add_argument('packages', nargs='+')
args = parser.parse_args()
selected_pms = [pm for pm, flag in [
('apt', args.apt or args.all),
('dnf', args.dnf or args.all),
('pacman', args.pacman or args.all),
('apk', args.apk or args.all),
('zypper', args.zypper or args.all),
('aur', args.aur or args.all)
] if flag]
if not selected_pms:
print(colorize("Error: No package managers selected", 'red'))
return
results = {}
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(fetch_package_data, pkg): pkg for pkg in args.packages}
for future in as_completed(futures):
pkg = futures[future]
try:
data = future.result()
results[pkg] = data or []
except Exception as e:
print(colorize(f"Error processing {pkg}: {str(e)}", 'red'))
results[pkg] = []
output = {}
for pkg, entries in results.items():
pm_versions = {pm: {'version': '', 'repos': set(), 'key': []} for pm in selected_pms}
for entry in entries:
repo = entry.get('repo', '')
version = entry.get('version', 'N/A')
pm = get_package_manager(repo)
if pm in selected_pms and version != 'N/A':
repo_fmt = format_repository(repo)
current_key = version_key(version)
stored = pm_versions[pm]
if not stored['key'] or current_key > stored['key']:
stored['version'] = version
stored['repos'] = {repo_fmt}
stored['key'] = current_key
elif current_key == stored['key']:
stored['repos'].add(repo_fmt)
output[pkg] = {}
for pm in selected_pms:
data = pm_versions[pm]
if data['version']:
repos = ', '.join(sorted(data['repos']))
output[pkg][pm] = f"{data['version']} ({repos})"
else:
output[pkg][pm] = 'Not found'
print_table(output, ['Package'] + selected_pms, selected_pms, args)
if __name__ == '__main__':
main()

View file

@ -0,0 +1,16 @@
print('BANK OF DEDEDEX')
pin1 = int(input("Enter your first PIN: "))
pin2 = int(input("Enter your PIN again, to confirm: "))
if pin1 == pin2:
password = pin1
pin = int(input('Enter your PIN: '))
while pin != password:
pin = int(input('Incorrect PIN. Enter your PIN again: '))
if pin == password:
print('PIN was entered successfully!')

76
python/dev/fonts.py Executable file
View file

@ -0,0 +1,76 @@
#!/usr/bin/env python3
import os
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from colorama import init, Fore, Style
import requests
# Initialize colorama
init(autoreset=True)
# List of font repositories
FONT_REPOS = {
"JetBrainsMono": "https://github.com/ryanoasis/nerd-fonts/releases/download/v3.3.0/JetBrainsMono.zip",
"CaskaydiaCove": "https://github.com/ryanoasis/nerd-fonts/releases/download/v3.3.0/CascadiaCode.zip",
# Add more repositories here
}
def display_menu(options, existing_fonts):
print(Fore.CYAN + "Select Font Repositories to Download:")
for i, option in enumerate(options, 1):
icon = Fore.GREEN + "" if option in existing_fonts else Fore.RED + " "
print(f"{i}. {option} {icon}")
print(Fore.YELLOW + "0. Exit")
def get_user_selection(options):
selected_indices = []
while True:
try:
choices = input(Fore.YELLOW + "Enter the numbers of your choices separated by spaces (q to quit): ")
if choices.strip().lower() == "q":
break
indices = [int(choice.strip()) - 1 for choice in choices.split() if choice.strip().isdigit()]
for index in indices:
if 0 <= index < len(options):
selected_indices.append(index)
else:
print(Fore.RED + f"Invalid choice: {index + 1}. Please try again.")
except ValueError:
print(Fore.RED + "Invalid input. Please enter numbers separated by spaces.")
return selected_indices
def download_font(url, download_path):
response = requests.get(url)
with open(download_path, 'wb') as file:
file.write(response.content)
def clone_repos(selected_repos, base_clone_directory):
with ThreadPoolExecutor(max_workers=4) as executor:
future_to_repo = {
executor.submit(download_font, FONT_REPOS[repo], os.path.join(base_clone_directory, f"{repo}.zip")): repo
for repo in selected_repos
}
for future in as_completed(future_to_repo):
repo = future_to_repo[future]
try:
future.result()
print(Fore.GREEN + f"Successfully downloaded {repo}")
except Exception as e:
print(Fore.RED + f"Failed to download {repo}: {e}")
def check_existing_fonts(clone_directory):
return {repo for repo in FONT_REPOS if os.path.exists(os.path.join(clone_directory, f"{repo}.zip"))}
def main():
base_clone_directory = os.path.expanduser("~/.local/share/fonts/")
os.makedirs(base_clone_directory, exist_ok=True)
existing_fonts = check_existing_fonts(base_clone_directory)
options = list(FONT_REPOS.keys())
display_menu(options, existing_fonts)
selected_indices = get_user_selection(options)
selected_repos = [options[i] for i in selected_indices if options[i] not in existing_fonts]
clone_repos(selected_repos, base_clone_directory)
if __name__ == "__main__":
main()

5
python/dev/test.py Normal file
View file

@ -0,0 +1,5 @@
from datetime import datetime
current_year = datetime.now().year
print(current_year)

42
python/mergefiles.py Normal file
View file

@ -0,0 +1,42 @@
import os
from datetime import datetime
def merge_files(file_paths):
# Dictionary to hold file contents by extension
files_by_extension = {}
# Read each file and group contents by extension
for file_path in file_paths:
_, ext = os.path.splitext(file_path)
if ext not in files_by_extension:
files_by_extension[ext] = []
with open(file_path, 'r') as file:
files_by_extension[ext].append(file.read())
# Ensure the .old directory exists
old_dir = '.old'
if not os.path.exists(old_dir):
os.makedirs(old_dir)
# Write merged contents to new files
for ext, contents in files_by_extension.items():
current_date = datetime.now().strftime("%Y-%m-%d")
merged_file_path = f'{current_date}{ext}'
# Move existing merged file to .old directory
if os.path.exists(merged_file_path):
os.rename(merged_file_path, os.path.join(old_dir, f'{current_date}{ext}'))
with open(merged_file_path, 'w') as merged_file:
for content in contents:
merged_file.write(content)
if __name__ == "__main__":
# Example usage: specify the file paths you want to merge
file_paths = [
'/test/first.csv',
'/test/second.csv',
'/test/third.csv'
]
merge_files(file_paths)