From bcd12630e1b322609f4682eda94c98d8b9dbe925 Mon Sep 17 00:00:00 2001 From: dirkf Date: Sun, 3 Mar 2024 23:21:10 +0000 Subject: [PATCH] Align code with yt-dlp --- test/test_utils.py | 8 -------- youtube_dl/utils.py | 33 +++++++++++++++++++-------------- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/test/test_utils.py b/test/test_utils.py index 9cad656b0..90d64b581 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -256,14 +256,6 @@ class TestUtil(unittest.TestCase): self.assertEqual(sanitize_url('https://foo.bar'), 'https://foo.bar') self.assertEqual(sanitize_url('foo bar'), 'foo bar') - def test_extract_user_pass(self): - self.assertEqual(extract_user_pass('http://foo.bar'), ('http://foo.bar', None, None)) - self.assertEqual(extract_user_pass('http://:foo.bar'), ('http://:foo.bar', None, None)) - self.assertEqual(extract_user_pass('http://@foo.bar'), ('http://foo.bar', '', '')) - self.assertEqual(extract_user_pass('http://:pass@foo.bar'), ('http://foo.bar', '', 'pass')) - self.assertEqual(extract_user_pass('http://user:@foo.bar'), ('http://foo.bar', 'user', '')) - self.assertEqual(extract_user_pass('http://user:pass@foo.bar'), ('http://foo.bar', 'user', 'pass')) - def test_sanitized_Request(self): self.assertFalse(sanitized_Request('http://foo.bar').has_header('Authorization')) self.assertFalse(sanitized_Request('http://:foo.bar').has_header('Authorization')) diff --git a/youtube_dl/utils.py b/youtube_dl/utils.py index 3596a04ed..c249e7168 100644 --- a/youtube_dl/utils.py +++ b/youtube_dl/utils.py @@ -2182,23 +2182,28 @@ def sanitize_url(url): return url -def extract_user_pass(url): - parts = compat_urlparse.urlsplit(url) - username = parts.username - password = parts.password - if username is not None: - if password is None: - password = '' - netloc = parts.hostname - if parts.port is not None: - netloc = parts.hostname + ':' + parts.port - parts = parts._replace(netloc=netloc) - url = compat_urlparse.urlunsplit(parts) - return url, username, password +def extract_basic_auth(url): + parts = compat_urllib_parse.urlsplit(url) + if parts.username is None: + return url, None + url = compat_urllib_parse.urlunsplit(parts._replace(netloc=( + parts.hostname if parts.port is None + else '%s:%d' % (parts.hostname, parts.port)))) + auth_payload = base64.b64encode( + ('%s:%s' % (parts.username, parts.password or '')).encode('utf-8')) + return url, 'Basic {0}'.format(auth_payload.decode('ascii')) def sanitized_Request(url, *args, **kwargs): - return compat_urllib_request.Request(escape_url(sanitize_url(url)), *args, **kwargs) + url, auth_header = extract_basic_auth(escape_url(sanitize_url(url))) + if auth_header is not None: + headers = args[1] if len(args) > 1 else kwargs.get('headers') + headers = headers or {} + headers['Authorization'] = auth_header + if len(args) <= 1 and kwargs.get('headers') is None: + kwargs['headers'] = headers + kwargs = compat_kwargs(kwargs) + return compat_urllib_request.Request(url, *args, **kwargs) def expand_path(s):