You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

519 lines
16 KiB

from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
from datetime import datetime
import re
import os
import locale
import yaml
# Set the locale to UTF-8
locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
def read_yaml_settings(file_yaml):
# If the yaml file exists
if os.path.isfile(file_yaml):
with open(file_yaml, 'r') as fOpen:
return yaml.safe_load(fOpen)
def fetch_url(url):
if not url:
return
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:68.0) Gecko/20100101 Firefox/68.0'}
print('[i] Fetching:', url)
try:
response = urlopen(Request(url, headers=headers))
except HTTPError as e:
print('[E] HTTP Error:', e.code, 'whilst fetching', url)
return
except URLError as e:
print('[E] URL Error:', e.reason, 'whilst fetching', url)
return
# Read and decode
response = response.read().decode('UTF-8').replace('\r\n', '\n')
# If there is data
if response:
# Strip leading and trailing whitespace
response = '\n'.join(x.strip() for x in response.splitlines())
# Return the hosts
return response
def run_str_subs(string, dict_subs, precompiled=False):
# Return None if the supplied string was empty
if not string or not dict_subs:
return
# If the patterns aren't already compiled
# (it may be necessary to pre-compile if calling for a for loop)
if not precompiled:
# Add compiled regexps to dict
dict_subs = {re.compile(rf'{k}', re.M): v for k, v in dict_subs.items()}
# For each sub pattern
for pattern, sub in dict_subs.items():
# Remove matches
string = pattern.sub(sub, string)
return string
def sub_hosts(str_hosts):
# Conditional exit if argument not supplied
if not str_hosts:
return
# Construct substitution array
dict_subs = \
{
# Remove local dead-zone
r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}\s+': '',
# Remove IP addresses
r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$': '',
# Remove any line that doesn't start a-z 0-9
r'^[^a-z0-9].*': '',
# Remove in-line comments
r'[^\S\n]+#.*$': '',
# Remove entries without a '.' (non-domains) or that start with
# localhost. and don't have any subsequent dots
r'^(?:(?![^.\n]+\.).*|localhost\.[^.\n]+)$': '',
# Remove empty lines
r'^[\t\s]*(?:\r?\n|\r)+': ''
}
str_hosts = run_str_subs(str_hosts, dict_subs).lower()
return str_hosts
def sub_regexps(str_regexps):
# Conditional exit if argument not supplied
if not str_regexps:
return
# Construct substitution array
dict_subs = \
{
# Remove comments
r'^#.*$': '',
# Remove empty lines
r'^[\t\s]*(?:\r?\n|\r)+': ''
}
str_regexps = run_str_subs(str_regexps, dict_subs)
return str_regexps
def sub_filters(str_filters):
# Conditional exit if argument not supplied
if not str_filters:
return
# Construct substitution array
dict_subs = \
{
# Remove non-valid (for AdGuard Home)
# restrictive / whitelist filters
r'^(?!(?:@@)?\|\|[a-z0-9_.-]+\^(?:\||(?:\$(?:third-party|document)))?$).*$': '',
# Remove $third-party or $document suffixes
r'\$(?:third-party|document)$': '',
# Remove IP addresses
r'^\|\|(?:[0-9]{1,3}\.){3}[0-9]{1,3}\^$': '',
# Remove empty lines
r'^[\t\s]*(?:\r?\n|\r)+': ''
}
str_filters = run_str_subs(str_filters, dict_subs).lower()
return str_filters
def fetch_hosts(h_urls):
if not h_urls:
return
set_hosts = set()
# For each host file
for url in h_urls:
# Fetch the hosts
str_hosts = fetch_url(url)
str_hosts = sub_hosts(str_hosts)
# If no hosts were returned (or an error occurred fetching them)
# Jump to the next host file
if not str_hosts:
continue
# Add to array (append)
set_hosts.update(str_hosts.splitlines())
return set_hosts
def convert_hosts_to_restrictive_filters(set_hosts):
if not set_hosts:
return
# Create string from set_hosts
str_hosts = '\n'.join(set_hosts)
# Remove www prefixes
# providing there is at least one further dot (e.g. exclude www.be, www.fr)
str_hosts = run_str_subs(str_hosts, {r'^www\.(?=(?:[^.\n]+\.){1,}[^.\n]+$)': ''})
# Remove sub-domains
# and add back to filter format
set_hosts = {f'||{x}^' for x in
remove_subdomains(set(str_hosts.splitlines()))}
return set_hosts
def fetch_regexps(r_urls):
if not r_urls:
return
set_regexps = set()
for url in r_urls:
# Read the regexps
str_regexps = fetch_url(url)
str_regexps = sub_regexps(str_regexps)
# Conditional skip
if not str_regexps:
continue
# Update regexps set in the correct format
set_regexps.update(f'/{r}/' for r in str_regexps.splitlines())
return set_regexps
def fetch_filters(f_urls):
if not f_urls:
return
set_filters = set()
# For each host file
for url in f_urls:
# Fetch the hosts
str_filters = fetch_url(url)
str_filters = sub_filters(str_filters)
# If no hosts were returned (or an error occurred fetching them)
# Jump to the next host file
if not str_filters:
continue
# Add to array (append)
set_filters.update(str_filters.splitlines())
return set_filters
def parse_filters(set_hosts_and_filters, path_includes, file_filter_whitelist):
if not set_hosts_and_filters:
return
set_restrictive_filters = set()
set_unverified_whitelist = set()
set_verified_whitelist = set()
# If a filter whitelist has been provided
if file_filter_whitelist:
# Join the file path / name
file_filter_whitelist = os.path.join(path_includes, file_filter_whitelist)
# If the path exists and it is a file
if os.path.isfile(file_filter_whitelist):
# Add each line that's not a comment to the unverified whitelist set
with open(file_filter_whitelist, 'r', encoding='UTF-8') as fOpen:
set_unverified_whitelist.update(line for line in (line.strip() for line in fOpen)
if line and not line.startswith(('!', '#')))
# Filter pattern to match ||test.com^
valid_filter_pattern = re.compile(r'^\|\|([a-z0-9_.-]+)\^$', flags=re.M)
# Whitelist pattern to match @@||test.com^ or @@||test.com^|
valid_whitelist_pattern = re.compile(r'^@@\|\|([a-z0-9_.-]+)\^\|?$', flags=re.M)
# Convert filters to string format
str_hosts_and_filters = '\n'.join(set_hosts_and_filters)
# Extract valid restrictive filters
list_valid_filters = valid_filter_pattern.findall(str_hosts_and_filters)
# Extract valid whitelist filters
list_valid_whitelist = valid_whitelist_pattern.findall(str_hosts_and_filters)
# Add valid filters to set
if list_valid_filters:
set_restrictive_filters.update(list_valid_filters)
# Add valid whitelist to set
if list_valid_whitelist:
set_unverified_whitelist.update(list_valid_whitelist)
# If there are still checks required
if set_unverified_whitelist:
"""
At this point we will build a string with artificial markers.
It is significantly faster to match against a whole string
instead of iterating through two lists and comparing.
"""
# Add exact matches to whitelist verified
set_verified_whitelist = set_restrictive_filters.intersection(set_unverified_whitelist)
# If there were exact whitelist matches
if set_verified_whitelist:
# Remove them from the unverified whitelist
set_unverified_whitelist.difference_update(set_verified_whitelist)
# Remove them from the restrictive filters (we'll keep the whitelist
# entry in-case it's in other lists)
set_restrictive_filters.difference_update(set_verified_whitelist)
# If there are still items to process in set_unverified_whitelist
if set_unverified_whitelist:
# Add artificial markers: .something.com$ (checking for existence of sub-domains)
gen_match_filters = (f'.{x}$' for x in set_restrictive_filters)
# Add artificial markers: ^something.com$ (so we can see whether each match criteria
# starts and ends
str_match_whitelist = '\n'.join(f'^{x}$' for x in set_unverified_whitelist)
# Gather restrictive filters that match the partial string
filter_match_result = filter(lambda x: x in str_match_whitelist, gen_match_filters)
# For each filter sub-domain that matched in the whitelist
for match in filter_match_result:
# For each whitelist
for whitelist in str_match_whitelist.splitlines():
# is .test.com$ in ^test.test.com$
if match in whitelist:
set_verified_whitelist.add(whitelist)
# If there were verified whitelist items
if set_verified_whitelist:
# Build substitution dict ready to remove
# the artificial markers
dict_subs = {r'^(?:\^|\.)': '', r'\$$': ''}
# Remove start / end markers and
# add @@|| prefix and ^ suffix to verified whitelist matches
set_verified_whitelist = {f'@@||{x}^' for x in
run_str_subs('\n'.join(set_verified_whitelist), dict_subs).splitlines()}
# Remove sub-domains again in-case a filter introduced
# a top-level domain
# Add || prefix and ^ suffix to set filters
set_restrictive_filters = {f'||{x}^' for x in remove_subdomains(set_restrictive_filters)}
return set.union(set_restrictive_filters, set_verified_whitelist)
def output_required(set_content, path_output, file):
# Initialise local_content
set_local_content = set()
# Store full file path
file_path = os.path.join(path_output, file)
# If the file already exists in the output directory
if os.path.isfile(file_path):
# Fetch the local file
# without the added header comments
with open(file_path, 'r', encoding='UTF-8') as fOpen:
set_local_content.update(line for line in (line.strip() for line in fOpen)
if line and not line.startswith(('!', '#')))
# If the local copy was empty
# output the file
if not set_local_content:
return True
# If the local copy is identical to
# the generated output
if set_content == set_local_content:
print('[i] No updates required for', file)
return False
else:
return True
# File does not exist
else:
return True
def identify_wildcards(hosts, limit=50):
# Conditionally exit if hosts not provided
if not hosts:
return
# Create set to store wildcards
wildcards = {}
# Set prev tracker to None
prev = None
# Set iterator to 0
i = 0
# Reverse each host
rev_hosts = [host[::-1] for host in hosts]
# Sort reversed hosts
rev_hosts.sort()
# For each host
for host in rev_hosts:
# If the domain is not a subdomain of the previous
# iteration
if not host.startswith(f'{prev}.'):
# If our previous host had more subdomains
# than the limit
if i >= limit:
# Add to wildcards set
wildcards[prev[::-1]] = i
# Set previous domain to the current iteration
prev = host
# Reset the iterator
i = 0
else:
# Current iteration is a subdomain of the last
# so increment the counter
i += 1
# Sort dict on sub-domain count (desc)
wildcards = {k: v for k, v in sorted(wildcards.items(), key=lambda x: x[1], reverse=True)}
return wildcards
def remove_subdomains(hosts):
# Conditionally exit if hosts not provided
if not hosts:
return
# Create set to store wildcards
cleaned_hosts = set()
# Set prev tracker to None
prev = None
# Reverse each host
rev_hosts = [host[::-1] for host in hosts]
# Sort reversed hosts
rev_hosts.sort()
# For each host
for host in rev_hosts:
# If the domain is not a subdomain of the previous
# iteration
if not host.startswith(f'{prev}.'):
# Conditionally set rev_host depending on prev
rev_host = prev[::-1] if prev else host[::-1]
# Add to host set
cleaned_hosts.add(rev_host)
# Set previous domain to the current iteration
prev = host
return cleaned_hosts
class Output:
def __init__(self, path_base: str, path_output: str, path_includes: str, arr_sources: list, file_header: str,
list_output: list, file_name: str, file_type: int, description: str):
self.path_base = path_base
self.path_output = path_output
self.path_includes = path_includes
self.arr_sources = arr_sources
self.file_header = file_header
self.list_output = list_output
self.file_name = file_name
self.file_type = file_type
self.description = description
def build_header(self):
# Store header file path
file_header = os.path.join(self.path_includes, self.file_header)
# If header file exists
if os.path.isfile(file_header):
# Open it
with open(file_header, 'r', encoding='UTF-8') as fOpen:
# Add each line to list if not blank
arr_header = [line for line in (line.strip() for line in fOpen) if line]
# If the header file is not empty
if arr_header:
# Fetch the header
# Join header and store in a string
str_header = '\n'.join(arr_header)
# Get the current timestamp with timezone
time_timestamp = datetime.now().astimezone().strftime('%d-%m-%Y %H:%M %Z')
# Get the appropriate comment character
c = '!' if self.file_type == 2 else '#'
# Set default for description if none is set
description = self.description or 'None'
# Fetch the sources and put into string
str_sources = '\n'.join([f'{c} {source}' for source in self.arr_sources]) or f'{c} None'
# Set the replacement criteria
dict_subs = \
{
'{c}': c,
'{title}': f'AdguardHome - {self.file_name}',
'{description}': description,
'{time_timestamp}': time_timestamp,
'{count}': f'{len(self.list_output):n}',
f'{c} {{arr_sources}}': str_sources
}
# Run the replacements
for k, v in dict_subs.items():
str_header = str_header.replace(k, v)
return str_header
def output_file(self):
# Store the output path
path_output = self.path_output
# Output file path
out_file = os.path.join(path_output, self.file_name)
# Double check output folder exists
if not os.path.exists(path_output):
os.makedirs(path_output)
# Set header to None by default
str_header = self.build_header()
# Output the file
print(f'[i] Outputting {self.file_name} to:', path_output)
with open(out_file, 'w', newline='\n', encoding='UTF-8') as f:
if str_header:
# Output header
f.write(f'{str_header}\n')
# Output hosts
f.writelines(f'{host}\n' for host in self.list_output)