init: pristine aerc 0.20.0 source
This commit is contained in:
Executable
+268
@@ -0,0 +1,268 @@
|
||||
#!/usr/bin/env python3
|
||||
# SPDX-License-Identifier: MIT
|
||||
# Copyright (c) 2023 Robin Jarry
|
||||
|
||||
"""
|
||||
Query a CardDAV server for contact names and emails.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import configparser
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import xml.etree.ElementTree as xml
|
||||
from urllib import error, parse, request
|
||||
|
||||
|
||||
def main():
|
||||
try:
|
||||
args = parse_args()
|
||||
|
||||
C = "urn:ietf:params:xml:ns:carddav"
|
||||
D = "DAV:"
|
||||
xml.register_namespace("C", C)
|
||||
xml.register_namespace("D", D)
|
||||
|
||||
# perform the actual address book query
|
||||
query = xml.Element(f"{{{C}}}addressbook-query")
|
||||
prop = xml.SubElement(query, f"{{{D}}}prop")
|
||||
xml.SubElement(prop, f"{{{D}}}getetag")
|
||||
data = xml.SubElement(prop, f"{{{C}}}address-data")
|
||||
xml.SubElement(data, f"{{{C}}}prop", name="FN")
|
||||
xml.SubElement(data, f"{{{C}}}prop", name="EMAIL")
|
||||
limit = xml.SubElement(query, f"{{{C}}}limit")
|
||||
xml.SubElement(limit, f"{{{C}}}nresults").text = str(args.limit)
|
||||
filtre = xml.SubElement(query, f"{{{C}}}filter", test="anyof")
|
||||
for term in args.terms:
|
||||
for attr in "FN", "EMAIL", "NICKNAME", "ORG", "TITLE":
|
||||
prop = xml.SubElement(filtre, f"{{{C}}}prop-filter", name=attr)
|
||||
match = xml.SubElement(
|
||||
prop, f"{{{C}}}text-match", {"match-type": "contains"}
|
||||
)
|
||||
match.text = term
|
||||
data = http_request_xml(
|
||||
"REPORT",
|
||||
args.server_url,
|
||||
query,
|
||||
username=args.username,
|
||||
password=args.password,
|
||||
debug=args.verbose,
|
||||
Depth="1",
|
||||
)
|
||||
for vcard in data.iterfind(f".//{{{C}}}address-data"):
|
||||
for name, email in parse_vcard(vcard.text.strip()):
|
||||
print(f"{email}\t{name}")
|
||||
|
||||
except Exception as e:
|
||||
if isinstance(e, error.HTTPError):
|
||||
if args.verbose:
|
||||
debug_response(e.fp)
|
||||
e = e.fp.read().decode()
|
||||
print(f"error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def http_request_xml(
|
||||
method: str,
|
||||
url: str,
|
||||
data: xml.Element,
|
||||
username: str = None,
|
||||
password: str = None,
|
||||
debug: bool = False,
|
||||
**headers,
|
||||
) -> xml.Element:
|
||||
req = request.Request(
|
||||
url=url,
|
||||
method=method,
|
||||
headers={
|
||||
"Content-Type": 'text/xml; charset="utf-8"',
|
||||
**headers,
|
||||
},
|
||||
data=xml.tostring(data, encoding="utf-8", xml_declaration=True),
|
||||
)
|
||||
if username is not None and password is not None:
|
||||
auth = f"{username}:{password}"
|
||||
auth = base64.standard_b64encode(auth.encode("utf-8")).decode("ascii")
|
||||
req.add_header("Authorization", f"Basic {auth}")
|
||||
|
||||
if debug:
|
||||
uri = parse.urlparse(req.full_url)
|
||||
print(f"> {req.method} {uri.path} HTTP/1.1", file=sys.stderr)
|
||||
print(f"> Host: {uri.hostname}", file=sys.stderr)
|
||||
for name, value in req.headers.items():
|
||||
print(f"> {name}: {value}", file=sys.stderr)
|
||||
print(f"{req.data.decode('utf-8')}\n", file=sys.stderr)
|
||||
|
||||
with request.urlopen(req) as resp:
|
||||
data = resp.read().decode("utf-8")
|
||||
if debug:
|
||||
debug_response(resp)
|
||||
print(f"{data}", file=sys.stderr)
|
||||
|
||||
return xml.fromstring(data)
|
||||
|
||||
|
||||
def debug_response(resp):
|
||||
print(f"< HTTP/1.1 {resp.code}", file=sys.stderr)
|
||||
for name, value in resp.headers.items():
|
||||
print(f"< {name}: {value}", file=sys.stderr)
|
||||
|
||||
|
||||
def parse_vcard(txt):
|
||||
lines = txt.splitlines()
|
||||
if len(lines) < 4 or lines[0] != "BEGIN:VCARD" or lines[-1] != "END:VCARD":
|
||||
return
|
||||
name = None
|
||||
emails = []
|
||||
for line in lines[1:-1]:
|
||||
if line.startswith("FN:"):
|
||||
name = line[len("FN:") :].replace("\\,", ",")
|
||||
continue
|
||||
match = re.match(r"^(?:ITEM\d+\.)?EMAIL(?:;[\w-]+=[^;:]+)*:(.+@.+)$", line)
|
||||
if match:
|
||||
email = match.group(1).lower().replace("\\,", ",")
|
||||
if email not in emails:
|
||||
if "TYPE=pref" in line or "PREF=1" in line:
|
||||
emails.insert(0, email)
|
||||
else:
|
||||
emails.append(email)
|
||||
if name is not None:
|
||||
for e in emails:
|
||||
yield name, e
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument(
|
||||
"-l",
|
||||
"--limit",
|
||||
default=10,
|
||||
type=int,
|
||||
help="""
|
||||
Maximum number of results returned by the server (default: 10).
|
||||
If the server does not support limiting, this will be disregarded.
|
||||
""",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-v",
|
||||
"--verbose",
|
||||
action="store_true",
|
||||
help="""
|
||||
Print debug info on stderr.
|
||||
""",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-c",
|
||||
"--config-file",
|
||||
metavar="FILE",
|
||||
default=os.path.expanduser("~/.config/aerc/accounts.conf"),
|
||||
help="""
|
||||
INI configuration file from which to read the CardDAV URL endpoint
|
||||
(default: ~/.config/aerc/accounts.conf).
|
||||
""",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-S",
|
||||
"--config-section",
|
||||
metavar="SECTION",
|
||||
help="""
|
||||
INI configuration section where to find CONFIG_KEY. By default the
|
||||
first section where CONFIG_KEY is found will be used.
|
||||
""",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k",
|
||||
"--config-key-source",
|
||||
metavar="KEY_SOURCE",
|
||||
default="carddav-source",
|
||||
help="""
|
||||
INI configuration key to lookup in CONFIG_SECTION from CONFIG_FILE.
|
||||
The value must respect the following format:
|
||||
https?://USERNAME[:PASSWORD]@HOSTNAME/PATH/TO/ADDRESSBOOK.
|
||||
Both USERNAME and PASSWORD must be percent encoded.
|
||||
""",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-C",
|
||||
"--config-key-cred-cmd",
|
||||
metavar="KEY_CRED_CMD",
|
||||
default="carddav-source-cred-cmd",
|
||||
help="""
|
||||
INI configuration key to lookup in CONFIG_SECTION from CONFIG_FILE. The
|
||||
value is a command that will be used to determine PASSWORD if it is not
|
||||
present in CONFIG_KEY_SOURCE.
|
||||
""",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-s",
|
||||
"--server-url",
|
||||
help="""
|
||||
CardDAV server URL endpoint. Overrides configuration file.
|
||||
""",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-u",
|
||||
"--username",
|
||||
help="""
|
||||
Username to authenticate on the server. Overrides configuration file.
|
||||
""",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-p",
|
||||
"--password",
|
||||
help="""
|
||||
Password for the specified user. Overrides configuration file.
|
||||
""",
|
||||
)
|
||||
parser.add_argument(
|
||||
"terms",
|
||||
nargs="+",
|
||||
metavar="TERM",
|
||||
help="""
|
||||
Search term. Will be used to search contacts from their FN (formatted
|
||||
name), EMAIL, NICKNAME, ORG (company) and TITLE fields.
|
||||
""",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
cfg = configparser.RawConfigParser(strict=False)
|
||||
cfg.read([args.config_file])
|
||||
source = cred_cmd = None
|
||||
if args.config_section:
|
||||
source = cfg.get(args.config_section, args.config_key_source, fallback=None)
|
||||
cred_cmd = cfg.get(args.config_section, args.config_key_cred_cmd, fallback=None)
|
||||
else:
|
||||
for sec in cfg.sections():
|
||||
source = cfg.get(sec, args.config_key_source, fallback=None)
|
||||
if source is not None:
|
||||
cred_cmd = cfg.get(sec, args.config_key_cred_cmd, fallback=None)
|
||||
break
|
||||
if source is not None:
|
||||
try:
|
||||
u = parse.urlparse(source)
|
||||
if args.username is None and u.username is not None:
|
||||
args.username = parse.unquote(u.username)
|
||||
if args.password is None and u.password is not None:
|
||||
args.password = parse.unquote(u.password)
|
||||
if not args.password and cred_cmd is not None:
|
||||
args.password = subprocess.check_output(
|
||||
cred_cmd, shell=True, text=True, encoding="utf-8"
|
||||
).strip()
|
||||
if args.server_url is None:
|
||||
args.server_url = f"{u.scheme}://{u.hostname}"
|
||||
if u.port is not None:
|
||||
args.server_url += f":{u.port}"
|
||||
args.server_url += u.path
|
||||
except ValueError as e:
|
||||
parser.error(f"{args.config_file}: {e}")
|
||||
if args.server_url is None:
|
||||
parser.error("SERVER_URL is required")
|
||||
|
||||
return args
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user