-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
Signed-off-by: Henry Schreiner <[email protected]>
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,8 @@ | |
|
||
import copy | ||
import dataclasses | ||
import email.message | ||
import email.policy | ||
import email.utils | ||
import os | ||
import os.path | ||
|
@@ -113,46 +115,36 @@ class ConfigurationWarning(UserWarning): | |
"""Warnings about backend metadata.""" | ||
|
||
|
||
class RFC822Message: | ||
"""Python-flavored RFC 822 message implementation.""" | ||
|
||
__slots__ = ('_headers', '_body') | ||
|
||
def __init__(self) -> None: | ||
self._headers: list[tuple[str, str]] = [] | ||
self._body: str | None = None | ||
@dataclasses.dataclass | ||
class _SmartMessageSetter: | ||
""" | ||
This provides a nice internal API for setting values in an RFC822Message to | ||
reduce boilerplate. | ||
def items(self) -> list[tuple[str, str]]: | ||
return self._headers.copy() | ||
If a value is None, do nothing. | ||
If a value contains a newline, indent it (may produce a warning in the future). | ||
""" | ||
|
||
def get_all(self, name: str) -> None | list[str]: | ||
return [v for k, v in self.items() if k == name] | ||
message: email.message.EmailMessage | ||
|
||
def __setitem__(self, name: str, value: str | None) -> None: | ||
if not value: | ||
return | ||
self._headers.append((name, value)) | ||
if '\n' in value: | ||
value = value.replace('\n', '\n ') | ||
self.message[name] = value | ||
|
||
def __str__(self) -> str: | ||
text = '' | ||
for name, entry in self.items(): | ||
lines = entry.strip('\n').split('\n') | ||
text += f'{name}: {lines[0]}\n' | ||
for line in lines[1:]: | ||
text += ' ' * 8 + line + '\n' | ||
text += '\n' | ||
if self._body: | ||
text += self._body | ||
return text | ||
|
||
def __bytes__(self) -> bytes: | ||
return str(self).encode() | ||
class RFC822Message(email.message.EmailMessage): | ||
"""Python-flavored RFC 822 message implementation.""" | ||
|
||
def get_payload(self) -> str | None: | ||
return self._body | ||
__slots__ = () | ||
|
||
def set_payload(self, body: str) -> None: | ||
self._body = body | ||
def __init__(self) -> None: | ||
super().__init__(email.policy.compat32) | ||
|
||
def __str__(self) -> str: | ||
return bytes(self).decode('utf-8') | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
henryiii
Author
Collaborator
|
||
|
||
|
||
class DataFetcher: | ||
|
@@ -567,58 +559,60 @@ def as_rfc822(self) -> RFC822Message: | |
self.write_to_rfc822(message) | ||
return message | ||
|
||
def write_to_rfc822(self, message: RFC822Message) -> None: # noqa: C901 | ||
def write_to_rfc822(self, message: email.message.EmailMessage) -> None: # noqa: C901 | ||
This comment has been minimized.
Sorry, something went wrong.
dnicolodi
Contributor
|
||
self.validate() | ||
|
||
message['Metadata-Version'] = self.metadata_version | ||
message['Name'] = self.name | ||
smart_message = _SmartMessageSetter(message) | ||
|
||
smart_message['Metadata-Version'] = self.metadata_version | ||
smart_message['Name'] = self.name | ||
if not self.version: | ||
msg = 'Missing version field' | ||
raise ConfigurationError(msg) | ||
message['Version'] = str(self.version) | ||
smart_message['Version'] = str(self.version) | ||
# skip 'Platform' | ||
# skip 'Supported-Platform' | ||
if self.description: | ||
message['Summary'] = self.description | ||
message['Keywords'] = ','.join(self.keywords) | ||
smart_message['Summary'] = self.description | ||
smart_message['Keywords'] = ','.join(self.keywords) | ||
if 'homepage' in self.urls: | ||
message['Home-page'] = self.urls['homepage'] | ||
smart_message['Home-page'] = self.urls['homepage'] | ||
# skip 'Download-URL' | ||
message['Author'] = self._name_list(self.authors) | ||
message['Author-Email'] = self._email_list(self.authors) | ||
message['Maintainer'] = self._name_list(self.maintainers) | ||
message['Maintainer-Email'] = self._email_list(self.maintainers) | ||
smart_message['Author'] = self._name_list(self.authors) | ||
smart_message['Author-Email'] = self._email_list(self.authors) | ||
smart_message['Maintainer'] = self._name_list(self.maintainers) | ||
smart_message['Maintainer-Email'] = self._email_list(self.maintainers) | ||
if self.license: | ||
message['License'] = self.license.text | ||
smart_message['License'] = self.license.text | ||
for classifier in self.classifiers: | ||
message['Classifier'] = classifier | ||
smart_message['Classifier'] = classifier | ||
# skip 'Provides-Dist' | ||
# skip 'Obsoletes-Dist' | ||
# skip 'Requires-External' | ||
for name, url in self.urls.items(): | ||
message['Project-URL'] = f'{name.capitalize()}, {url}' | ||
smart_message['Project-URL'] = f'{name.capitalize()}, {url}' | ||
if self.requires_python: | ||
message['Requires-Python'] = str(self.requires_python) | ||
smart_message['Requires-Python'] = str(self.requires_python) | ||
for dep in self.dependencies: | ||
message['Requires-Dist'] = str(dep) | ||
smart_message['Requires-Dist'] = str(dep) | ||
for extra, requirements in self.optional_dependencies.items(): | ||
norm_extra = extra.replace('.', '-').replace('_', '-').lower() | ||
message['Provides-Extra'] = norm_extra | ||
smart_message['Provides-Extra'] = norm_extra | ||
for requirement in requirements: | ||
message['Requires-Dist'] = str( | ||
smart_message['Requires-Dist'] = str( | ||
self._build_extra_req(norm_extra, requirement) | ||
) | ||
if self.readme: | ||
if self.readme.content_type: | ||
message['Description-Content-Type'] = self.readme.content_type | ||
smart_message['Description-Content-Type'] = self.readme.content_type | ||
message.set_payload(self.readme.text) | ||
# Core Metadata 2.2 | ||
if self.metadata_version != '2.1': | ||
for field in self.dynamic: | ||
if field in ('name', 'version'): | ||
msg = f'Field cannot be dynamic: {field}' | ||
raise ConfigurationError(msg) | ||
message['Dynamic'] = field | ||
smart_message['Dynamic'] = field | ||
|
||
def _name_list(self, people: list[tuple[str, str | None]]) -> str: | ||
return ', '.join(name for name, email_ in people if not email_) | ||
|
Why do we need this?
email.message.EmailMessage
has a__str__
method that should be pretty much equivalent.