Align code with yt-dlp

This commit is contained in:
dirkf 2024-03-03 23:21:10 +00:00 committed by GitHub
parent 02c80a0215
commit bcd12630e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 19 additions and 22 deletions

View File

@ -256,14 +256,6 @@ class TestUtil(unittest.TestCase):
self.assertEqual(sanitize_url('https://foo.bar'), 'https://foo.bar') self.assertEqual(sanitize_url('https://foo.bar'), 'https://foo.bar')
self.assertEqual(sanitize_url('foo bar'), 'foo bar') self.assertEqual(sanitize_url('foo bar'), 'foo bar')
def test_extract_user_pass(self):
self.assertEqual(extract_user_pass('http://foo.bar'), ('http://foo.bar', None, None))
self.assertEqual(extract_user_pass('http://:foo.bar'), ('http://:foo.bar', None, None))
self.assertEqual(extract_user_pass('http://@foo.bar'), ('http://foo.bar', '', ''))
self.assertEqual(extract_user_pass('http://:pass@foo.bar'), ('http://foo.bar', '', 'pass'))
self.assertEqual(extract_user_pass('http://user:@foo.bar'), ('http://foo.bar', 'user', ''))
self.assertEqual(extract_user_pass('http://user:pass@foo.bar'), ('http://foo.bar', 'user', 'pass'))
def test_sanitized_Request(self): def test_sanitized_Request(self):
self.assertFalse(sanitized_Request('http://foo.bar').has_header('Authorization')) self.assertFalse(sanitized_Request('http://foo.bar').has_header('Authorization'))
self.assertFalse(sanitized_Request('http://:foo.bar').has_header('Authorization')) self.assertFalse(sanitized_Request('http://:foo.bar').has_header('Authorization'))

View File

@ -2182,23 +2182,28 @@ def sanitize_url(url):
return url return url
def extract_user_pass(url): def extract_basic_auth(url):
parts = compat_urlparse.urlsplit(url) parts = compat_urllib_parse.urlsplit(url)
username = parts.username if parts.username is None:
password = parts.password return url, None
if username is not None: url = compat_urllib_parse.urlunsplit(parts._replace(netloc=(
if password is None: parts.hostname if parts.port is None
password = '' else '%s:%d' % (parts.hostname, parts.port))))
netloc = parts.hostname auth_payload = base64.b64encode(
if parts.port is not None: ('%s:%s' % (parts.username, parts.password or '')).encode('utf-8'))
netloc = parts.hostname + ':' + parts.port return url, 'Basic {0}'.format(auth_payload.decode('ascii'))
parts = parts._replace(netloc=netloc)
url = compat_urlparse.urlunsplit(parts)
return url, username, password
def sanitized_Request(url, *args, **kwargs): def sanitized_Request(url, *args, **kwargs):
return compat_urllib_request.Request(escape_url(sanitize_url(url)), *args, **kwargs) url, auth_header = extract_basic_auth(escape_url(sanitize_url(url)))
if auth_header is not None:
headers = args[1] if len(args) > 1 else kwargs.get('headers')
headers = headers or {}
headers['Authorization'] = auth_header
if len(args) <= 1 and kwargs.get('headers') is None:
kwargs['headers'] = headers
kwargs = compat_kwargs(kwargs)
return compat_urllib_request.Request(url, *args, **kwargs)
def expand_path(s): def expand_path(s):