aboutsummaryrefslogtreecommitdiffstats
path: root/youtube/yt_data_extract
diff options
context:
space:
mode:
authorLibravatarLibravatar Biswakalyan Bhuyan <biswa@surgot.in> 2024-09-19 15:33:11 +0530
committerLibravatarLibravatar Biswakalyan Bhuyan <biswa@surgot.in> 2024-09-19 15:33:11 +0530
commita4e01da27c08e43a67b2618ad1e71c1f8f86d5cd (patch)
tree5b8f407dbb7e9d1ab2106ac0cc8564897e7a2098 /youtube/yt_data_extract
downloadyt-local-master.tar.gz
yt-local-master.tar.bz2
yt-local-master.zip
youtube fronendHEADmaster
Diffstat (limited to 'youtube/yt_data_extract')
-rw-r--r--youtube/yt_data_extract/__init__.py13
-rw-r--r--youtube/yt_data_extract/common.py610
-rw-r--r--youtube/yt_data_extract/everything_else.py372
-rw-r--r--youtube/yt_data_extract/watch_extraction.py948
4 files changed, 1943 insertions, 0 deletions
diff --git a/youtube/yt_data_extract/__init__.py b/youtube/yt_data_extract/__init__.py
new file mode 100644
index 0000000..de1812d
--- /dev/null
+++ b/youtube/yt_data_extract/__init__.py
@@ -0,0 +1,13 @@
+from .common import (get, multi_get, deep_get, multi_deep_get,
+ liberal_update, conservative_update, remove_redirect, normalize_url,
+ extract_str, extract_formatted_text, extract_int, extract_approx_int,
+ extract_date, extract_item_info, extract_items, extract_response)
+
+from .everything_else import (extract_channel_info, extract_search_info,
+ extract_playlist_metadata, extract_playlist_info, extract_comments_info)
+
+from .watch_extraction import (extract_watch_info, get_caption_url,
+ update_with_new_urls, requires_decryption,
+ extract_decryption_function, decrypt_signatures, _formats,
+ update_format_with_type_info, extract_hls_formats,
+ extract_watch_info_from_html, captions_available)
diff --git a/youtube/yt_data_extract/common.py b/youtube/yt_data_extract/common.py
new file mode 100644
index 0000000..7903db5
--- /dev/null
+++ b/youtube/yt_data_extract/common.py
@@ -0,0 +1,610 @@
+import re
+import urllib.parse
+import collections
+import collections.abc
+
+def get(object, key, default=None, types=()):
+ '''Like dict.get(), but returns default if the result doesn't match one of the types.
+ Also works for indexing lists.'''
+ try:
+ result = object[key]
+ except (TypeError, IndexError, KeyError):
+ return default
+
+ if not types or isinstance(result, types):
+ return result
+ else:
+ return default
+
+def multi_get(object, *keys, default=None, types=()):
+ '''Like get, but try other keys if the first fails'''
+ for key in keys:
+ try:
+ result = object[key]
+ except (TypeError, IndexError, KeyError):
+ pass
+ else:
+ if not types or isinstance(result, types):
+ return result
+ else:
+ continue
+ return default
+
+
+def deep_get(object, *keys, default=None, types=()):
+ '''Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices.
+ Last argument is the default value to use in case of any IndexErrors or KeyErrors.
+ If types is given and the result doesn't match one of those types, default is returned'''
+ try:
+ for key in keys:
+ object = object[key]
+ except (TypeError, IndexError, KeyError):
+ return default
+ else:
+ if not types or isinstance(object, types):
+ return object
+ else:
+ return default
+
+def multi_deep_get(object, *key_sequences, default=None, types=()):
+ '''Like deep_get, but can try different key sequences in case one fails.
+ Return default if all of them fail. key_sequences is a list of lists'''
+ for key_sequence in key_sequences:
+ _object = object
+ try:
+ for key in key_sequence:
+ _object = _object[key]
+ except (TypeError, IndexError, KeyError):
+ pass
+ else:
+ if not types or isinstance(_object, types):
+ return _object
+ else:
+ continue
+ return default
+
+
+def _is_empty(value):
+ '''Determines if value is None or an empty iterable, such as '' and []'''
+ if value is None:
+ return True
+ elif isinstance(value, collections.abc.Iterable) and not value:
+ return True
+ return False
+
+
+def liberal_update(obj, key, value):
+ '''Updates obj[key] with value as long as value is not None or empty.
+ Ensures obj[key] will at least get an empty value, however'''
+ if (not _is_empty(value)) or (key not in obj):
+ obj[key] = value
+
+def conservative_update(obj, key, value):
+ '''Only updates obj if it doesn't have key or obj[key] is None/empty'''
+ if _is_empty(obj.get(key)):
+ obj[key] = value
+
+
+def liberal_dict_update(dict1, dict2):
+ '''Update dict1 with keys from dict2 using liberal_update'''
+ for key, value in dict2.items():
+ liberal_update(dict1, key, value)
+
+
+def conservative_dict_update(dict1, dict2):
+ '''Update dict1 with keys from dict2 using conservative_update'''
+ for key, value in dict2.items():
+ conservative_update(dict1, key, value)
+
+
+def concat_or_none(*strings):
+ '''Concatenates strings. Returns None if any of the arguments are None'''
+ result = ''
+ for string in strings:
+ if string is None:
+ return None
+ result += string
+ return result
+
+def remove_redirect(url):
+ if url is None:
+ return None
+ if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # YouTube puts these on external links to do tracking
+ query_string = url[url.find('?')+1: ]
+ return urllib.parse.parse_qs(query_string)['q'][0]
+ return url
+
+norm_url_re = re.compile(r'^(?:(?:https?:)?//)?((?:[\w-]+\.)+[\w-]+)?(/.*)$')
+def normalize_url(url):
+ '''Insert https, resolve relative paths for youtube.com, and put www. infront of youtube.com'''
+ if url is None:
+ return None
+ match = norm_url_re.fullmatch(url)
+ if match is None:
+ raise Exception(url)
+
+ domain = match.group(1) or 'www.youtube.com'
+ if domain == 'youtube.com':
+ domain = 'www.youtube.com'
+
+ return 'https://' + domain + match.group(2)
+
+def _recover_urls(runs):
+ for run in runs:
+ url = deep_get(run, 'navigationEndpoint', 'urlEndpoint', 'url')
+ text = run.get('text', '')
+ # second condition is necessary because YouTube makes other things into urls, such as hashtags, which we want to keep as text
+ if url is not None and (text.startswith('http://') or text.startswith('https://')):
+ url = remove_redirect(url)
+ run['url'] = url
+ run['text'] = url # YouTube truncates the url text, use actual url instead
+
+def extract_str(node, default=None, recover_urls=False):
+ '''default is the value returned if the extraction fails. If recover_urls is true, will attempt to fix YouTube's truncation of url text (most prominently seen in descriptions)'''
+ if isinstance(node, str):
+ return node
+
+ try:
+ return node['simpleText']
+ except (KeyError, TypeError):
+ pass
+
+ if isinstance(node, dict) and 'runs' in node:
+ if recover_urls:
+ _recover_urls(node['runs'])
+ return ''.join(text_run.get('text', '') for text_run in node['runs'])
+
+ return default
+
+def extract_formatted_text(node):
+ if not node:
+ return []
+ if 'runs' in node:
+ _recover_urls(node['runs'])
+ return node['runs']
+ elif 'simpleText' in node:
+ return [{'text': node['simpleText']}]
+ return []
+
+def extract_int(string, default=None, whole_word=True):
+ if isinstance(string, int):
+ return string
+ if not isinstance(string, str):
+ string = extract_str(string)
+ if not string:
+ return default
+ if whole_word:
+ match = re.search(r'\b(\d+)\b', string.replace(',', ''))
+ else:
+ match = re.search(r'(\d+)', string.replace(',', ''))
+ if match is None:
+ return default
+ try:
+ return int(match.group(1))
+ except ValueError:
+ return default
+
+def extract_approx_int(string):
+ '''e.g. "15.1M" from "15.1M subscribers" or '4,353' from 4353'''
+ if not isinstance(string, str):
+ string = extract_str(string)
+ if not string:
+ return None
+ match = re.search(r'\b(\d+(?:\.\d+)?[KMBTkmbt]?)\b', string.replace(',', ''))
+ if match is None:
+ return None
+ result = match.group(1)
+ if re.fullmatch(r'\d+', result):
+ result = '{:,}'.format(int(result))
+ return result
+
+MONTH_ABBREVIATIONS = {'jan':'1', 'feb':'2', 'mar':'3', 'apr':'4', 'may':'5', 'jun':'6', 'jul':'7', 'aug':'8', 'sep':'9', 'oct':'10', 'nov':'11', 'dec':'12'}
+def extract_date(date_text):
+ '''Input: "Mar 9, 2019". Output: "2019-3-9"'''
+ if not isinstance(date_text, str):
+ date_text = extract_str(date_text)
+ if date_text is None:
+ return None
+
+ date_text = date_text.replace(',', '').lower()
+ parts = date_text.split()
+ if len(parts) >= 3:
+ month, day, year = parts[-3:]
+ month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name
+ if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None):
+ return year + '-' + month + '-' + day
+ return None
+
+def check_missing_keys(object, *key_sequences):
+ for key_sequence in key_sequences:
+ _object = object
+ try:
+ for key in key_sequence:
+ _object = _object[key]
+ except (KeyError, IndexError, TypeError):
+ return 'Could not find ' + key
+
+ return None
+
+def extract_item_info(item, additional_info={}):
+ if not item:
+ return {'error': 'No item given'}
+
+ type = get(list(item.keys()), 0)
+ if not type:
+ return {'error': 'Could not find type'}
+ item = item[type]
+
+ info = {'error': None}
+ if type in ('itemSectionRenderer', 'compactAutoplayRenderer'):
+ return extract_item_info(deep_get(item, 'contents', 0), additional_info)
+
+ if type in ('movieRenderer', 'clarificationRenderer'):
+ info['type'] = 'unsupported'
+ return info
+
+ # type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer'
+ # camelCase split, https://stackoverflow.com/a/37697078
+ type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()]
+ if len(type_parts) < 2:
+ info['type'] = 'unsupported'
+ return
+ primary_type = type_parts[-2]
+ if primary_type == 'video':
+ info['type'] = 'video'
+ elif type_parts[0] == 'reel': # shorts
+ info['type'] = 'video'
+ primary_type = 'video'
+ elif primary_type in ('playlist', 'radio', 'show'):
+ info['type'] = 'playlist'
+ info['playlist_type'] = primary_type
+ elif primary_type == 'channel':
+ info['type'] = 'channel'
+ elif type == 'videoWithContextRenderer': # stupid exception
+ info['type'] = 'video'
+ primary_type = 'video'
+ else:
+ info['type'] = 'unsupported'
+
+ # videoWithContextRenderer changes it to 'headline' just to be annoying
+ info['title'] = extract_str(multi_get(item, 'title', 'headline'))
+ if primary_type != 'channel':
+ info['author'] = extract_str(multi_get(item, 'longBylineText', 'shortBylineText', 'ownerText'))
+ info['author_id'] = extract_str(multi_deep_get(item,
+ ['longBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
+ ['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
+ ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId']
+ ))
+ info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None
+ info['description'] = extract_formatted_text(multi_deep_get(
+ item,
+ ['descriptionText'], ['descriptionSnippet'],
+ ['detailedMetadataSnippets', 0, 'snippetText'],
+ ))
+ info['thumbnail'] = normalize_url(multi_deep_get(item,
+ ['thumbnail', 'thumbnails', 0, 'url'], # videos
+ ['thumbnails', 0, 'thumbnails', 0, 'url'], # playlists
+ ['thumbnailRenderer', 'showCustomThumbnailRenderer', 'thumbnail', 'thumbnails', 0, 'url'], # shows
+ ))
+
+ info['badges'] = []
+ for badge_node in multi_get(item, 'badges', 'ownerBadges', default=()):
+ badge = deep_get(badge_node, 'metadataBadgeRenderer', 'label')
+ if badge:
+ info['badges'].append(badge)
+
+ if primary_type in ('video', 'playlist'):
+ info['time_published'] = None
+ timestamp = re.search(r'(\d+ \w+ ago)',
+ extract_str(item.get('publishedTimeText'), default=''))
+ if timestamp:
+ info['time_published'] = timestamp.group(1)
+
+ if primary_type == 'video':
+ info['id'] = multi_deep_get(item,
+ ['videoId'],
+ ['navigationEndpoint', 'watchEndpoint', 'videoId'],
+ ['navigationEndpoint', 'reelWatchEndpoint', 'videoId'] # shorts
+ )
+ info['view_count'] = extract_int(item.get('viewCountText'))
+
+ # dig into accessibility data to get view_count for videos marked as recommended, and to get time_published
+ accessibility_label = multi_deep_get(item,
+ ['title', 'accessibility', 'accessibilityData', 'label'],
+ ['headline', 'accessibility', 'accessibilityData', 'label'],
+ default='')
+ timestamp = re.search(r'(\d+ \w+ ago)', accessibility_label)
+ if timestamp:
+ conservative_update(info, 'time_published', timestamp.group(1))
+ view_count = re.search(r'(\d+) views', accessibility_label.replace(',', ''))
+ if view_count:
+ conservative_update(info, 'view_count', int(view_count.group(1)))
+
+ if info['view_count']:
+ info['approx_view_count'] = '{:,}'.format(info['view_count'])
+ else:
+ info['approx_view_count'] = extract_approx_int(multi_get(item,
+ 'shortViewCountText',
+ 'viewCountText' # shorts
+ ))
+
+ # handle case where it is "No views"
+ if not info['approx_view_count']:
+ if ('No views' in item.get('shortViewCountText', '')
+ or 'no views' in accessibility_label.lower()
+ or 'No views' in extract_str(item.get('viewCountText', '')) # shorts
+ ):
+ info['view_count'] = 0
+ info['approx_view_count'] = '0'
+
+ info['duration'] = extract_str(item.get('lengthText'))
+
+ # dig into accessibility data to get duration for shorts
+ accessibility_label = deep_get(item,
+ 'accessibility', 'accessibilityData', 'label',
+ default='')
+ duration = re.search(r'(\d+) (second|seconds|minute) - play video$',
+ accessibility_label)
+ if duration:
+ if duration.group(2) == 'minute':
+ conservative_update(info, 'duration', '1:00')
+ else:
+ conservative_update(info,
+ 'duration', '0:' + duration.group(1).zfill(2))
+
+ # if it's an item in a playlist, get its index
+ if 'index' in item: # url has wrong index on playlist page
+ info['index'] = extract_int(item.get('index'))
+ elif 'indexText' in item:
+ # Current item in playlist has ▶ instead of the actual index, must
+ # dig into url
+ match = re.search(r'index=(\d+)', deep_get(item,
+ 'navigationEndpoint', 'commandMetadata', 'webCommandMetadata',
+ 'url', default=''))
+ if match is None: # worth a try then
+ info['index'] = extract_int(item.get('indexText'))
+ else:
+ info['index'] = int(match.group(1))
+ else:
+ info['index'] = None
+
+ elif primary_type in ('playlist', 'radio'):
+ info['id'] = item.get('playlistId')
+ info['video_count'] = extract_int(item.get('videoCount'))
+ info['first_video_id'] = deep_get(item, 'navigationEndpoint',
+ 'watchEndpoint', 'videoId')
+ elif primary_type == 'channel':
+ info['id'] = item.get('channelId')
+ info['approx_subscriber_count'] = extract_approx_int(item.get('subscriberCountText'))
+ elif primary_type == 'show':
+ info['id'] = deep_get(item, 'navigationEndpoint', 'watchEndpoint', 'playlistId')
+ info['first_video_id'] = deep_get(item, 'navigationEndpoint',
+ 'watchEndpoint', 'videoId')
+
+ if primary_type in ('playlist', 'channel'):
+ conservative_update(info, 'video_count', extract_int(item.get('videoCountText')))
+
+ for overlay in item.get('thumbnailOverlays', []):
+ conservative_update(info, 'duration', extract_str(deep_get(
+ overlay, 'thumbnailOverlayTimeStatusRenderer', 'text'
+ )))
+ # show renderers don't have videoCountText
+ conservative_update(info, 'video_count', extract_int(deep_get(
+ overlay, 'thumbnailOverlayBottomPanelRenderer', 'text'
+ )))
+
+ info.update(additional_info)
+
+ return info
+
+def extract_response(polymer_json):
+ '''return response, error'''
+ # /youtubei/v1/browse endpoint returns response directly
+ if isinstance(polymer_json, dict) and 'responseContext' in polymer_json:
+ # this is the response
+ return polymer_json, None
+
+ response = multi_deep_get(polymer_json, [1, 'response'], ['response'])
+ if response is None:
+ return None, 'Failed to extract response'
+ else:
+ return response, None
+
+
+_item_types = {
+ 'movieRenderer',
+ 'didYouMeanRenderer',
+ 'showingResultsForRenderer',
+
+ 'videoRenderer',
+ 'compactVideoRenderer',
+ 'compactAutoplayRenderer',
+ 'videoWithContextRenderer',
+ 'gridVideoRenderer',
+ 'playlistVideoRenderer',
+
+ 'reelItemRenderer',
+
+ 'playlistRenderer',
+ 'compactPlaylistRenderer',
+ 'gridPlaylistRenderer',
+
+ 'radioRenderer',
+ 'compactRadioRenderer',
+ 'gridRadioRenderer',
+
+ 'showRenderer',
+ 'compactShowRenderer',
+ 'gridShowRenderer',
+
+
+ 'channelRenderer',
+ 'compactChannelRenderer',
+ 'gridChannelRenderer',
+}
+
+def _traverse_browse_renderer(renderer):
+ for tab in get(renderer, 'tabs', ()):
+ tab_renderer = multi_get(tab, 'tabRenderer', 'expandableTabRenderer')
+ if tab_renderer is None:
+ continue
+ if tab_renderer.get('selected', False):
+ return get(tab_renderer, 'content', {})
+ print('Could not find tab with content')
+ return {}
+
+def _traverse_standard_list(renderer):
+ renderer_list = multi_get(renderer, 'contents', 'items', default=())
+ continuation = deep_get(renderer, 'continuations', 0, 'nextContinuationData', 'continuation')
+ return renderer_list, continuation
+
+# these renderers contain one inside them
+nested_renderer_dispatch = {
+ 'singleColumnBrowseResultsRenderer': _traverse_browse_renderer,
+ 'twoColumnBrowseResultsRenderer': _traverse_browse_renderer,
+ 'twoColumnSearchResultsRenderer': lambda r: get(r, 'primaryContents', {}),
+ 'richItemRenderer': lambda r: get(r, 'content', {}),
+ 'engagementPanelSectionListRenderer': lambda r: get(r, 'content', {}),
+}
+
+# these renderers contain a list of renderers inside them
+nested_renderer_list_dispatch = {
+ 'sectionListRenderer': _traverse_standard_list,
+ 'itemSectionRenderer': _traverse_standard_list,
+ 'gridRenderer': _traverse_standard_list,
+ 'richGridRenderer': _traverse_standard_list,
+ 'playlistVideoListRenderer': _traverse_standard_list,
+ 'structuredDescriptionContentRenderer': _traverse_standard_list,
+ 'slimVideoMetadataSectionRenderer': _traverse_standard_list,
+ 'singleColumnWatchNextResults': lambda r: (deep_get(r, 'results', 'results', 'contents', default=[]), None),
+}
+def get_nested_renderer_list_function(key):
+ if key in nested_renderer_list_dispatch:
+ return nested_renderer_list_dispatch[key]
+ elif key.endswith('Continuation'):
+ return _traverse_standard_list
+ return None
+
+def extract_items_from_renderer(renderer, item_types=_item_types):
+ ctoken = None
+ items = []
+
+ iter_stack = collections.deque()
+ current_iter = iter(())
+
+ while True:
+ # mode 1: get a new renderer by iterating.
+ # goes down the stack for an iterator if one has been exhausted
+ if not renderer:
+ try:
+ renderer = current_iter.__next__()
+ except StopIteration:
+ try:
+ current_iter = iter_stack.pop()
+ except IndexError:
+ return items, ctoken
+ # Get new renderer or check that the one we got is good before
+ # proceeding to mode 2
+ continue
+
+
+ # mode 2: dig into the current renderer
+ key, value = list(renderer.items())[0]
+
+ # the renderer is an item
+ if key in item_types:
+ items.append(renderer)
+
+ # ctoken sometimes placed in these renderers, e.g. channel playlists
+ elif key == 'continuationItemRenderer':
+ cont = deep_get(
+ value, 'continuationEndpoint', 'continuationCommand', 'token'
+ )
+ if cont:
+ ctoken = cont
+
+ # has a list in it, add it to the iter stack
+ elif get_nested_renderer_list_function(key):
+ renderer_list, cont = get_nested_renderer_list_function(key)(value)
+ if renderer_list:
+ iter_stack.append(current_iter)
+ current_iter = iter(renderer_list)
+ if cont:
+ ctoken = cont
+
+ # new renderer nested inside this one
+ elif key in nested_renderer_dispatch:
+ renderer = nested_renderer_dispatch[key](value)
+ continue # don't reset renderer to None
+
+ renderer = None
+
+
+def extract_items_from_renderer_list(renderers, item_types=_item_types):
+ '''Same as extract_items_from_renderer, but provide a list of renderers'''
+ items = []
+ ctoken = None
+ for renderer in renderers:
+ new_items, new_ctoken = extract_items_from_renderer(
+ renderer,
+ item_types=item_types)
+ items += new_items
+ # prioritize ctoken associated with items
+ if (not ctoken) or (new_ctoken and new_items):
+ ctoken = new_ctoken
+ return items, ctoken
+
+
+def extract_items(response, item_types=_item_types,
+ search_engagement_panels=False):
+ '''return items, ctoken'''
+ items = []
+ ctoken = None
+ if 'continuationContents' in response:
+ # sometimes there's another, empty, junk [something]Continuation key
+ # find real one
+ for key, renderer_cont in get(response,
+ 'continuationContents', {}).items():
+ # e.g. commentSectionContinuation, playlistVideoListContinuation
+ if key.endswith('Continuation'):
+ items, ctoken = extract_items_from_renderer(
+ {key: renderer_cont},
+ item_types=item_types)
+ if items:
+ break
+ if ('onResponseReceivedEndpoints' in response
+ or 'onResponseReceivedActions' in response):
+ for endpoint in multi_get(response,
+ 'onResponseReceivedEndpoints',
+ 'onResponseReceivedActions',
+ []):
+ new_items, new_ctoken = extract_items_from_renderer_list(
+ multi_deep_get(
+ endpoint,
+ ['reloadContinuationItemsCommand', 'continuationItems'],
+ ['appendContinuationItemsAction', 'continuationItems'],
+ default=[]
+ ),
+ item_types=item_types,
+ )
+ items += new_items
+ if (not ctoken) or (new_ctoken and new_items):
+ ctoken = new_ctoken
+ if 'contents' in response:
+ renderer = get(response, 'contents', {})
+ new_items, new_ctoken = extract_items_from_renderer(
+ renderer,
+ item_types=item_types)
+ items += new_items
+ if (not ctoken) or (new_ctoken and new_items):
+ ctoken = new_ctoken
+
+ if search_engagement_panels and 'engagementPanels' in response:
+ new_items, new_ctoken = extract_items_from_renderer_list(
+ response['engagementPanels'], item_types=item_types
+ )
+ items += new_items
+ if (not ctoken) or (new_ctoken and new_items):
+ ctoken = new_ctoken
+
+ return items, ctoken
diff --git a/youtube/yt_data_extract/everything_else.py b/youtube/yt_data_extract/everything_else.py
new file mode 100644
index 0000000..0f64649
--- /dev/null
+++ b/youtube/yt_data_extract/everything_else.py
@@ -0,0 +1,372 @@
+from .common import (get, multi_get, deep_get, multi_deep_get,
+ liberal_update, conservative_update, remove_redirect, normalize_url,
+ extract_str, extract_formatted_text, extract_int, extract_approx_int,
+ extract_date, check_missing_keys, extract_item_info, extract_items,
+ extract_response)
+from youtube import proto
+
+import re
+import urllib
+from math import ceil
+
+def extract_channel_info(polymer_json, tab, continuation=False):
+ response, err = extract_response(polymer_json)
+ if err:
+ return {'error': err}
+
+
+ metadata = deep_get(response, 'metadata', 'channelMetadataRenderer',
+ default={})
+ if not metadata:
+ metadata = deep_get(response, 'microformat', 'microformatDataRenderer',
+ default={})
+
+ # channel doesn't exist or was terminated
+ # example terminated channel: https://www.youtube.com/channel/UCnKJeK_r90jDdIuzHXC0Org
+ # metadata and microformat are not present for continuation requests
+ if not metadata and not continuation:
+ if response.get('alerts'):
+ error_string = ' '.join(
+ extract_str(deep_get(alert, 'alertRenderer', 'text'), default='')
+ for alert in response['alerts']
+ )
+ if not error_string:
+ error_string = 'Failed to extract error'
+ return {'error': error_string}
+ elif deep_get(response, 'responseContext', 'errors'):
+ for error in response['responseContext']['errors'].get('error', []):
+ if error.get('code') == 'INVALID_VALUE' and error.get('location') == 'browse_id':
+ return {'error': 'This channel does not exist'}
+ return {'error': 'Failure getting metadata'}
+
+ info = {'error': None}
+ info['current_tab'] = tab
+
+ info['approx_subscriber_count'] = extract_approx_int(deep_get(response,
+ 'header', 'c4TabbedHeaderRenderer', 'subscriberCountText'))
+
+ # stuff from microformat (info given by youtube for first page on channel)
+ info['short_description'] = metadata.get('description')
+ if info['short_description'] and len(info['short_description']) > 730:
+ info['short_description'] = info['short_description'][0:730] + '...'
+ info['channel_name'] = metadata.get('title')
+ info['avatar'] = normalize_url(multi_deep_get(metadata,
+ ['avatar', 'thumbnails', 0, 'url'],
+ ['thumbnail', 'thumbnails', 0, 'url'],
+ ))
+ channel_url = multi_get(metadata, 'urlCanonical', 'channelUrl')
+ if channel_url:
+ channel_id = get(channel_url.rstrip('/').split('/'), -1)
+ info['channel_id'] = channel_id
+ else:
+ info['channel_id'] = metadata.get('externalId')
+ if info['channel_id']:
+ info['channel_url'] = 'https://www.youtube.com/channel/' + channel_id
+ else:
+ info['channel_url'] = None
+
+ # get items
+ info['items'] = []
+ info['ctoken'] = None
+
+ # empty channel
+ #if 'contents' not in response and 'continuationContents' not in response:
+ # return info
+
+ if tab in ('videos', 'shorts', 'streams', 'playlists', 'search'):
+ items, ctoken = extract_items(response)
+ additional_info = {
+ 'author': info['channel_name'],
+ 'author_id': info['channel_id'],
+ 'author_url': info['channel_url'],
+ }
+ info['items'] = [extract_item_info(renderer, additional_info) for renderer in items]
+ info['ctoken'] = ctoken
+ if tab in ('search', 'playlists'):
+ info['is_last_page'] = (ctoken is None)
+ elif tab == 'about':
+ # Latest type
+ items, _ = extract_items(response, item_types={'aboutChannelRenderer'})
+ if items:
+ a_metadata = deep_get(items, 0, 'aboutChannelRenderer',
+ 'metadata', 'aboutChannelViewModel')
+ if not a_metadata:
+ info['error'] = 'Could not find aboutChannelViewModel'
+ return info
+
+ info['links'] = []
+ for link_outer in a_metadata.get('links', ()):
+ link = link_outer.get('channelExternalLinkViewModel') or {}
+ link_content = extract_str(deep_get(link, 'link', 'content'))
+ for run in deep_get(link, 'link', 'commandRuns') or ():
+ url = remove_redirect(deep_get(run, 'onTap',
+ 'innertubeCommand', 'urlEndpoint', 'url'))
+ if url and not (url.startswith('http://')
+ or url.startswith('https://')):
+ url = 'https://' + url
+ if link_content is None or (link_content in url):
+ break
+ else: # didn't break
+ url = link_content
+ if url and not (url.startswith('http://')
+ or url.startswith('https://')):
+ url = 'https://' + url
+ text = extract_str(deep_get(link, 'title', 'content'))
+ info['links'].append( (text, url) )
+
+ info['date_joined'] = extract_date(
+ a_metadata.get('joinedDateText')
+ )
+ info['view_count'] = extract_int(a_metadata.get('viewCountText'))
+ info['approx_view_count'] = extract_approx_int(
+ a_metadata.get('viewCountText')
+ )
+ info['description'] = extract_str(
+ a_metadata.get('description'), default=''
+ )
+ info['approx_video_count'] = extract_approx_int(
+ a_metadata.get('videoCountText')
+ )
+ info['approx_subscriber_count'] = extract_approx_int(
+ a_metadata.get('subscriberCountText')
+ )
+ info['country'] = extract_str(a_metadata.get('country'))
+ info['canonical_url'] = extract_str(
+ a_metadata.get('canonicalChannelUrl')
+ )
+
+ # Old type
+ else:
+ items, _ = extract_items(response,
+ item_types={'channelAboutFullMetadataRenderer'})
+ if not items:
+ info['error'] = 'Could not find aboutChannelRenderer or channelAboutFullMetadataRenderer'
+ return info
+ a_metadata = items[0]['channelAboutFullMetadataRenderer']
+
+ info['links'] = []
+ for link_json in a_metadata.get('primaryLinks', ()):
+ url = remove_redirect(deep_get(link_json, 'navigationEndpoint',
+ 'urlEndpoint', 'url'))
+ if url and not (url.startswith('http://')
+ or url.startswith('https://')):
+ url = 'https://' + url
+ text = extract_str(link_json.get('title'))
+ info['links'].append( (text, url) )
+
+ info['date_joined'] = extract_date(a_metadata.get('joinedDateText'))
+ info['view_count'] = extract_int(a_metadata.get('viewCountText'))
+ info['description'] = extract_str(a_metadata.get(
+ 'description'), default='')
+
+ info['approx_video_count'] = None
+ info['approx_subscriber_count'] = None
+ info['country'] = None
+ info['canonical_url'] = None
+ else:
+ raise NotImplementedError('Unknown or unsupported channel tab: ' + tab)
+
+ return info
+
+def extract_search_info(polymer_json):
+ response, err = extract_response(polymer_json)
+ if err:
+ return {'error': err}
+ info = {'error': None}
+ info['estimated_results'] = int(response['estimatedResults'])
+ info['estimated_pages'] = ceil(info['estimated_results']/20)
+
+
+ results, _ = extract_items(response)
+
+
+ info['items'] = []
+ info['corrections'] = {'type': None}
+ for renderer in results:
+ type = list(renderer.keys())[0]
+ if type == 'shelfRenderer':
+ continue
+ if type == 'didYouMeanRenderer':
+ renderer = renderer[type]
+
+ info['corrections'] = {
+ 'type': 'did_you_mean',
+ 'corrected_query': renderer['correctedQueryEndpoint']['searchEndpoint']['query'],
+ 'corrected_query_text': renderer['correctedQuery']['runs'],
+ }
+ continue
+ if type == 'showingResultsForRenderer':
+ renderer = renderer[type]
+
+ info['corrections'] = {
+ 'type': 'showing_results_for',
+ 'corrected_query_text': renderer['correctedQuery']['runs'],
+ 'original_query_text': renderer['originalQuery']['simpleText'],
+ }
+ continue
+
+ i_info = extract_item_info(renderer)
+ if i_info.get('type') != 'unsupported':
+ info['items'].append(i_info)
+
+
+ return info
+
+def extract_playlist_metadata(polymer_json):
+ response, err = extract_response(polymer_json)
+ if err:
+ return {'error': err}
+
+ metadata = {'error': None}
+ header = deep_get(response, 'header', 'playlistHeaderRenderer', default={})
+ metadata['title'] = extract_str(header.get('title'))
+
+ metadata['first_video_id'] = deep_get(header, 'playEndpoint', 'watchEndpoint', 'videoId')
+ first_id = re.search(r'([a-z_\-]{11})', deep_get(header,
+ 'thumbnail', 'thumbnails', 0, 'url', default=''))
+ if first_id:
+ conservative_update(metadata, 'first_video_id', first_id.group(1))
+ if metadata['first_video_id'] is None:
+ metadata['thumbnail'] = None
+ else:
+ metadata['thumbnail'] = f"https://i.ytimg.com/vi/{metadata['first_video_id']}/hqdefault.jpg"
+
+ metadata['video_count'] = extract_int(header.get('numVideosText'))
+ metadata['description'] = extract_str(header.get('descriptionText'), default='')
+ metadata['author'] = extract_str(header.get('ownerText'))
+ metadata['author_id'] = multi_deep_get(header,
+ ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'],
+ ['ownerEndpoint', 'browseEndpoint', 'browseId'])
+ if metadata['author_id']:
+ metadata['author_url'] = 'https://www.youtube.com/channel/' + metadata['author_id']
+ else:
+ metadata['author_url'] = None
+ metadata['view_count'] = extract_int(header.get('viewCountText'))
+ metadata['like_count'] = extract_int(header.get('likesCountWithoutLikeText'))
+ for stat in header.get('stats', ()):
+ text = extract_str(stat)
+ if 'videos' in text:
+ conservative_update(metadata, 'video_count', extract_int(text))
+ elif 'views' in text:
+ conservative_update(metadata, 'view_count', extract_int(text))
+ elif 'updated' in text:
+ metadata['time_published'] = extract_date(text)
+
+ microformat = deep_get(response, 'microformat', 'microformatDataRenderer',
+ default={})
+ conservative_update(
+ metadata, 'title', extract_str(microformat.get('title'))
+ )
+ conservative_update(
+ metadata, 'description', extract_str(microformat.get('description'))
+ )
+ conservative_update(
+ metadata, 'thumbnail', deep_get(microformat, 'thumbnail',
+ 'thumbnails', -1, 'url')
+ )
+
+ return metadata
+
+def extract_playlist_info(polymer_json):
+ response, err = extract_response(polymer_json)
+ if err:
+ return {'error': err}
+ info = {'error': None}
+ video_list, _ = extract_items(response)
+
+ info['items'] = [extract_item_info(renderer) for renderer in video_list]
+
+ info['metadata'] = extract_playlist_metadata(polymer_json)
+
+ return info
+
+def _ctoken_metadata(ctoken):
+ result = dict()
+ params = proto.parse(proto.b64_to_bytes(ctoken))
+ result['video_id'] = proto.parse(params[2])[2].decode('ascii')
+
+ offset_information = proto.parse(params[6])
+ result['offset'] = offset_information.get(5, 0)
+
+ result['is_replies'] = False
+ if (3 in offset_information) and (2 in proto.parse(offset_information[3])):
+ result['is_replies'] = True
+ result['sort'] = None
+ else:
+ try:
+ result['sort'] = proto.parse(offset_information[4])[6]
+ except KeyError:
+ result['sort'] = 0
+ return result
+
+def extract_comments_info(polymer_json, ctoken=None):
+ response, err = extract_response(polymer_json)
+ if err:
+ return {'error': err}
+ info = {'error': None}
+
+ if ctoken:
+ metadata = _ctoken_metadata(ctoken)
+ else:
+ metadata = {}
+ info['video_id'] = metadata.get('video_id')
+ info['offset'] = metadata.get('offset')
+ info['is_replies'] = metadata.get('is_replies')
+ info['sort'] = metadata.get('sort')
+ info['video_title'] = None
+
+ comments, ctoken = extract_items(response,
+ item_types={'commentThreadRenderer', 'commentRenderer'})
+ info['comments'] = []
+ info['ctoken'] = ctoken
+ for comment in comments:
+ comment_info = {}
+
+ if 'commentThreadRenderer' in comment: # top level comments
+ conservative_update(info, 'is_replies', False)
+ comment_thread = comment['commentThreadRenderer']
+ info['video_title'] = extract_str(comment_thread.get('commentTargetTitle'))
+ if 'replies' not in comment_thread:
+ comment_info['reply_count'] = 0
+ comment_info['reply_ctoken'] = None
+ else:
+ comment_info['reply_count'] = extract_int(deep_get(comment_thread,
+ 'replies', 'commentRepliesRenderer', 'moreText'
+ ), default=1) # With 1 reply, the text reads "View reply"
+ comment_info['reply_ctoken'] = multi_deep_get(
+ comment_thread,
+ ['replies', 'commentRepliesRenderer', 'contents', 0,
+ 'continuationItemRenderer', 'button', 'buttonRenderer',
+ 'command', 'continuationCommand', 'token'],
+ ['replies', 'commentRepliesRenderer', 'continuations', 0,
+ 'nextContinuationData', 'continuation']
+ )
+ comment_renderer = deep_get(comment_thread, 'comment', 'commentRenderer', default={})
+ elif 'commentRenderer' in comment: # replies
+ comment_info['reply_count'] = 0 # replyCount, below, not present for replies even if the reply has further replies to it
+ comment_info['reply_ctoken'] = None
+ conservative_update(info, 'is_replies', True)
+ comment_renderer = comment['commentRenderer']
+ else:
+ comment_renderer = {}
+
+ # These 3 are sometimes absent, likely because the channel was deleted
+ comment_info['author'] = extract_str(comment_renderer.get('authorText'))
+ comment_info['author_url'] = normalize_url(deep_get(comment_renderer,
+ 'authorEndpoint', 'commandMetadata', 'webCommandMetadata', 'url'))
+ comment_info['author_id'] = deep_get(comment_renderer,
+ 'authorEndpoint', 'browseEndpoint', 'browseId')
+
+ comment_info['author_avatar'] = normalize_url(deep_get(
+ comment_renderer, 'authorThumbnail', 'thumbnails', 0, 'url'))
+ comment_info['id'] = comment_renderer.get('commentId')
+ comment_info['text'] = extract_formatted_text(comment_renderer.get('contentText'))
+ comment_info['time_published'] = extract_str(comment_renderer.get('publishedTimeText'))
+ comment_info['like_count'] = comment_renderer.get('likeCount')
+ comment_info['approx_like_count'] = extract_approx_int(
+ comment_renderer.get('voteCount'))
+ liberal_update(comment_info, 'reply_count', comment_renderer.get('replyCount'))
+
+ info['comments'].append(comment_info)
+
+ return info
diff --git a/youtube/yt_data_extract/watch_extraction.py b/youtube/yt_data_extract/watch_extraction.py
new file mode 100644
index 0000000..e09e2d3
--- /dev/null
+++ b/youtube/yt_data_extract/watch_extraction.py
@@ -0,0 +1,948 @@
+from .common import (get, multi_get, deep_get, multi_deep_get,
+ liberal_update, conservative_update, remove_redirect, normalize_url,
+ extract_str, extract_formatted_text, extract_int, extract_approx_int,
+ extract_date, check_missing_keys, extract_item_info, extract_items,
+ extract_response, concat_or_none, liberal_dict_update,
+ conservative_dict_update)
+
+import json
+import urllib.parse
+import traceback
+import re
+
+# from https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/extractor/youtube.py
+_formats = {
+ '5': {'ext': 'flv', 'width': 400, 'height': 240, 'acodec': 'mp3', 'audio_bitrate': 64, 'vcodec': 'h263'},
+ '6': {'ext': 'flv', 'width': 450, 'height': 270, 'acodec': 'mp3', 'audio_bitrate': 64, 'vcodec': 'h263'},
+ '13': {'ext': '3gp', 'acodec': 'aac', 'vcodec': 'mp4v'},
+ '17': {'ext': '3gp', 'width': 176, 'height': 144, 'acodec': 'aac', 'audio_bitrate': 24, 'vcodec': 'mp4v'},
+ '18': {'ext': 'mp4', 'width': 640, 'height': 360, 'acodec': 'aac', 'audio_bitrate': 96, 'vcodec': 'h264'},
+ '22': {'ext': 'mp4', 'width': 1280, 'height': 720, 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'},
+ '34': {'ext': 'flv', 'width': 640, 'height': 360, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
+ '35': {'ext': 'flv', 'width': 854, 'height': 480, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
+ # itag 36 videos are either 320x180 (BaW_jenozKc) or 320x240 (__2ABJjxzNo), audio_bitrate varies as well
+ '36': {'ext': '3gp', 'width': 320, 'acodec': 'aac', 'vcodec': 'mp4v'},
+ '37': {'ext': 'mp4', 'width': 1920, 'height': 1080, 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'},
+ '38': {'ext': 'mp4', 'width': 4096, 'height': 3072, 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'},
+ '43': {'ext': 'webm', 'width': 640, 'height': 360, 'acodec': 'vorbis', 'audio_bitrate': 128, 'vcodec': 'vp8'},
+ '44': {'ext': 'webm', 'width': 854, 'height': 480, 'acodec': 'vorbis', 'audio_bitrate': 128, 'vcodec': 'vp8'},
+ '45': {'ext': 'webm', 'width': 1280, 'height': 720, 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'},
+ '46': {'ext': 'webm', 'width': 1920, 'height': 1080, 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'},
+ '59': {'ext': 'mp4', 'width': 854, 'height': 480, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
+ '78': {'ext': 'mp4', 'width': 854, 'height': 480, 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
+
+
+ # 3D videos
+ '82': {'ext': 'mp4', 'height': 360, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
+ '83': {'ext': 'mp4', 'height': 480, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
+ '84': {'ext': 'mp4', 'height': 720, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'},
+ '85': {'ext': 'mp4', 'height': 1080, 'format_note': '3D', 'acodec': 'aac', 'audio_bitrate': 192, 'vcodec': 'h264'},
+ '100': {'ext': 'webm', 'height': 360, 'format_note': '3D', 'acodec': 'vorbis', 'audio_bitrate': 128, 'vcodec': 'vp8'},
+ '101': {'ext': 'webm', 'height': 480, 'format_note': '3D', 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'},
+ '102': {'ext': 'webm', 'height': 720, 'format_note': '3D', 'acodec': 'vorbis', 'audio_bitrate': 192, 'vcodec': 'vp8'},
+
+ # Apple HTTP Live Streaming
+ '91': {'ext': 'mp4', 'height': 144, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 48, 'vcodec': 'h264'},
+ '92': {'ext': 'mp4', 'height': 240, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 48, 'vcodec': 'h264'},
+ '93': {'ext': 'mp4', 'height': 360, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
+ '94': {'ext': 'mp4', 'height': 480, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 128, 'vcodec': 'h264'},
+ '95': {'ext': 'mp4', 'height': 720, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 256, 'vcodec': 'h264'},
+ '96': {'ext': 'mp4', 'height': 1080, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 256, 'vcodec': 'h264'},
+ '132': {'ext': 'mp4', 'height': 240, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 48, 'vcodec': 'h264'},
+ '151': {'ext': 'mp4', 'height': 72, 'format_note': 'HLS', 'acodec': 'aac', 'audio_bitrate': 24, 'vcodec': 'h264'},
+
+ # DASH mp4 video
+ '133': {'ext': 'mp4', 'height': 240, 'format_note': 'DASH video', 'vcodec': 'h264'},
+ '134': {'ext': 'mp4', 'height': 360, 'format_note': 'DASH video', 'vcodec': 'h264'},
+ '135': {'ext': 'mp4', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'h264'},
+ '136': {'ext': 'mp4', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'h264'},
+ '137': {'ext': 'mp4', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'h264'},
+ '138': {'ext': 'mp4', 'format_note': 'DASH video', 'vcodec': 'h264'}, # Height can vary (https://github.com/ytdl-org/youtube-dl/issues/4559)
+ '160': {'ext': 'mp4', 'height': 144, 'format_note': 'DASH video', 'vcodec': 'h264'},
+ '212': {'ext': 'mp4', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'h264'},
+ '264': {'ext': 'mp4', 'height': 1440, 'format_note': 'DASH video', 'vcodec': 'h264'},
+ '298': {'ext': 'mp4', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'h264', 'fps': 60},
+ '299': {'ext': 'mp4', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'h264', 'fps': 60},
+ '266': {'ext': 'mp4', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'h264'},
+
+ # Dash mp4 audio
+ '139': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'audio_bitrate': 48, 'container': 'm4a_dash'},
+ '140': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'audio_bitrate': 128, 'container': 'm4a_dash'},
+ '141': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'audio_bitrate': 256, 'container': 'm4a_dash'},
+ '256': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'container': 'm4a_dash'},
+ '258': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'aac', 'container': 'm4a_dash'},
+ '325': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'dtse', 'container': 'm4a_dash'},
+ '328': {'ext': 'm4a', 'format_note': 'DASH audio', 'acodec': 'ec-3', 'container': 'm4a_dash'},
+
+ # Dash webm
+ '167': {'ext': 'webm', 'height': 360, 'width': 640, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'},
+ '168': {'ext': 'webm', 'height': 480, 'width': 854, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'},
+ '169': {'ext': 'webm', 'height': 720, 'width': 1280, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'},
+ '170': {'ext': 'webm', 'height': 1080, 'width': 1920, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'},
+ '218': {'ext': 'webm', 'height': 480, 'width': 854, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'},
+ '219': {'ext': 'webm', 'height': 480, 'width': 854, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp8'},
+ '278': {'ext': 'webm', 'height': 144, 'format_note': 'DASH video', 'container': 'webm', 'vcodec': 'vp9'},
+ '242': {'ext': 'webm', 'height': 240, 'format_note': 'DASH video', 'vcodec': 'vp9'},
+ '243': {'ext': 'webm', 'height': 360, 'format_note': 'DASH video', 'vcodec': 'vp9'},
+ '244': {'ext': 'webm', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'vp9'},
+ '245': {'ext': 'webm', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'vp9'},
+ '246': {'ext': 'webm', 'height': 480, 'format_note': 'DASH video', 'vcodec': 'vp9'},
+ '247': {'ext': 'webm', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'vp9'},
+ '248': {'ext': 'webm', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'vp9'},
+ '271': {'ext': 'webm', 'height': 1440, 'format_note': 'DASH video', 'vcodec': 'vp9'},
+ # itag 272 videos are either 3840x2160 (e.g. RtoitU2A-3E) or 7680x4320 (sLprVF6d7Ug)
+ '272': {'ext': 'webm', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'vp9'},
+ '302': {'ext': 'webm', 'height': 720, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60},
+ '303': {'ext': 'webm', 'height': 1080, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60},
+ '308': {'ext': 'webm', 'height': 1440, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60},
+ '313': {'ext': 'webm', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'vp9'},
+ '315': {'ext': 'webm', 'height': 2160, 'format_note': 'DASH video', 'vcodec': 'vp9', 'fps': 60},
+
+ # Dash webm audio
+ '171': {'ext': 'webm', 'acodec': 'vorbis', 'format_note': 'DASH audio', 'audio_bitrate': 128},
+ '172': {'ext': 'webm', 'acodec': 'vorbis', 'format_note': 'DASH audio', 'audio_bitrate': 256},
+
+ # Dash webm audio with opus inside
+ '249': {'ext': 'webm', 'format_note': 'DASH audio', 'acodec': 'opus', 'audio_bitrate': 50},
+ '250': {'ext': 'webm', 'format_note': 'DASH audio', 'acodec': 'opus', 'audio_bitrate': 70},
+ '251': {'ext': 'webm', 'format_note': 'DASH audio', 'acodec': 'opus', 'audio_bitrate': 160},
+
+ # RTMP (unnamed)
+ '_rtmp': {'protocol': 'rtmp'},
+
+ # av01 video only formats sometimes served with "unknown" codecs
+ '394': {'vcodec': 'av01.0.05M.08'},
+ '395': {'vcodec': 'av01.0.05M.08'},
+ '396': {'vcodec': 'av01.0.05M.08'},
+ '397': {'vcodec': 'av01.0.05M.08'},
+}
+
+
+def _extract_from_video_information_renderer(renderer_content):
+ subtitle = extract_str(renderer_content.get('expandedSubtitle'),
+ default='')
+ info = {
+ 'title': extract_str(renderer_content.get('title')),
+ 'view_count': extract_int(subtitle),
+ 'unlisted': False,
+ 'live': 'watching' in subtitle,
+ }
+ for badge in renderer_content.get('badges', []):
+ if deep_get(badge, 'metadataBadgeRenderer', 'label') == 'Unlisted':
+ info['unlisted'] = True
+ return info
+
+def _extract_likes_dislikes(renderer_content):
+ def extract_button_count(toggle_button_renderer):
+ # all the digits can be found in the accessibility data
+ count = extract_int(multi_deep_get(
+ toggle_button_renderer,
+ ['defaultText', 'accessibility', 'accessibilityData', 'label'],
+ ['accessibility', 'label'],
+ ['accessibilityData', 'accessibilityData', 'label'],
+ ['accessibilityText'],
+ ))
+
+ # this count doesn't have all the digits, it's like 53K for instance
+ dumb_count = extract_int(extract_str(multi_get(
+ toggle_button_renderer, ['defaultText', 'title'])))
+
+ # The accessibility text will be "No likes" or "No dislikes" or
+ # something like that, but dumb count will be 0
+ if dumb_count == 0:
+ count = 0
+ return count
+
+ info = {
+ 'like_count': None,
+ 'dislike_count': None,
+ }
+ for button in renderer_content.get('buttons', ()):
+ if 'slimMetadataToggleButtonRenderer' in button:
+ button_renderer = button['slimMetadataToggleButtonRenderer']
+ count = extract_button_count(deep_get(button_renderer,
+ 'button',
+ 'toggleButtonRenderer'))
+ if 'isLike' in button_renderer:
+ info['like_count'] = count
+ elif 'isDislike' in button_renderer:
+ info['dislike_count'] = count
+ elif 'slimMetadataButtonRenderer' in button:
+ button_renderer = button['slimMetadataButtonRenderer']
+ liberal_update(info, 'like_count', extract_button_count(
+ multi_deep_get(button_renderer,
+ ['button', 'segmentedLikeDislikeButtonRenderer',
+ 'likeButton', 'toggleButtonRenderer'],
+ ['button', 'segmentedLikeDislikeButtonViewModel',
+ 'likeButtonViewModel', 'likeButtonViewModel',
+ 'toggleButtonViewModel', 'toggleButtonViewModel',
+ 'defaultButtonViewModel', 'buttonViewModel']
+ )
+ ))
+ '''liberal_update(info, 'dislike_count', extract_button_count(
+ deep_get(
+ button_renderer, 'button',
+ 'segmentedLikeDislikeButtonRenderer',
+ 'dislikeButton', 'toggleButtonRenderer'
+ )
+ ))'''
+ return info
+
+def _extract_from_owner_renderer(renderer_content):
+ return {
+ 'author': extract_str(renderer_content.get('title')),
+ 'author_id': deep_get(
+ renderer_content,
+ 'navigationEndpoint', 'browseEndpoint', 'browseId'),
+ }
+
+def _extract_from_video_header_renderer(renderer_content):
+ return {
+ 'title': extract_str(renderer_content.get('title')),
+ 'time_published': extract_date(extract_str(
+ renderer_content.get('publishDate'))),
+ }
+
+def _extract_from_description_renderer(renderer_content):
+ return {
+ 'description': extract_str(
+ renderer_content.get('descriptionBodyText'), recover_urls=True),
+ }
+
+def _extract_metadata_row_info(renderer_content):
+ # extract category and music list
+ info = {
+ 'category': None,
+ 'music_list': [],
+ }
+
+ current_song = {}
+ for row in deep_get(renderer_content, 'rows', default=[]):
+ row_title = extract_str(deep_get(row, 'metadataRowRenderer', 'title'), default='')
+ row_content = extract_str(deep_get(row, 'metadataRowRenderer', 'contents', 0))
+ if row_title == 'Category':
+ info['category'] = row_content
+ elif row_title in ('Song', 'Music'):
+ if current_song:
+ info['music_list'].append(current_song)
+ current_song = {'title': row_content}
+ elif row_title == 'Artist':
+ current_song['artist'] = row_content
+ elif row_title == 'Album':
+ current_song['album'] = row_content
+ elif row_title == 'Writers':
+ current_song['writers'] = row_content
+ elif row_title.startswith('Licensed'):
+ current_song['licensor'] = row_content
+ if current_song:
+ info['music_list'].append(current_song)
+
+ return info
+
+def _extract_from_music_renderer(renderer_content):
+ # latest format for the music list
+ info = {
+ 'music_list': [],
+ }
+
+ for carousel in renderer_content.get('carouselLockups', []):
+ song = {}
+ carousel = carousel.get('carouselLockupRenderer', {})
+ video_renderer = carousel.get('videoLockup', {})
+ video_renderer_info = extract_item_info(video_renderer)
+ video_id = video_renderer_info.get('id')
+ song['url'] = concat_or_none('https://www.youtube.com/watch?v=',
+ video_id)
+ song['title'] = video_renderer_info.get('title')
+ for row in carousel.get('infoRows', []):
+ row = row.get('infoRowRenderer', {})
+ title = extract_str(row.get('title'))
+ data = extract_str(row.get('defaultMetadata'))
+ if title == 'SONG':
+ song['title'] = data
+ elif title == 'ARTIST':
+ song['artist'] = data
+ elif title == 'ALBUM':
+ song['album'] = data
+ elif title == 'WRITERS':
+ song['writers'] = data
+ info['music_list'].append(song)
+ return info
+
+def _extract_from_video_metadata(renderer_content):
+ info = _extract_from_video_information_renderer(renderer_content)
+ liberal_dict_update(info, _extract_likes_dislikes(renderer_content))
+ liberal_dict_update(info, _extract_from_owner_renderer(renderer_content))
+ liberal_dict_update(info, _extract_metadata_row_info(deep_get(
+ renderer_content, 'metadataRowContainer',
+ 'metadataRowContainerRenderer', default={}
+ )))
+ liberal_update(info, 'title', extract_str(renderer_content.get('title')))
+ liberal_update(
+ info, 'description',
+ extract_str(renderer_content.get('description'), recover_urls=True)
+ )
+ liberal_update(info, 'time_published',
+ extract_date(renderer_content.get('dateText')))
+ return info
+
+visible_extraction_dispatch = {
+ # Either these ones spread around in various places
+ 'slimVideoInformationRenderer': _extract_from_video_information_renderer,
+ 'slimVideoActionBarRenderer': _extract_likes_dislikes,
+ 'slimOwnerRenderer': _extract_from_owner_renderer,
+ 'videoDescriptionHeaderRenderer': _extract_from_video_header_renderer,
+ 'videoDescriptionMusicSectionRenderer': _extract_from_music_renderer,
+ 'expandableVideoDescriptionRenderer': _extract_from_description_renderer,
+ 'metadataRowContainerRenderer': _extract_metadata_row_info,
+ # OR just this one, which contains SOME of the above inside it
+ 'slimVideoMetadataRenderer': _extract_from_video_metadata,
+}
+
+def _extract_watch_info_mobile(top_level):
+ '''Scrapes information from the visible page'''
+ info = {}
+ response = top_level.get('response', {})
+
+ # this renderer has the stuff visible on the page
+ # check for playlist
+ items, _ = extract_items(response,
+ item_types={'singleColumnWatchNextResults'})
+ if items:
+ watch_next_results = items[0]['singleColumnWatchNextResults']
+ playlist = deep_get(watch_next_results, 'playlist', 'playlist')
+ if playlist is None:
+ info['playlist'] = None
+ else:
+ info['playlist'] = {}
+ info['playlist']['title'] = playlist.get('title')
+ info['playlist']['author'] = extract_str(multi_get(playlist,
+ 'ownerName', 'longBylineText', 'shortBylineText', 'ownerText'))
+ author_id = deep_get(playlist, 'longBylineText', 'runs', 0,
+ 'navigationEndpoint', 'browseEndpoint', 'browseId')
+ info['playlist']['author_id'] = author_id
+ info['playlist']['author_url'] = concat_or_none(
+ 'https://www.youtube.com/channel/', author_id)
+ info['playlist']['id'] = playlist.get('playlistId')
+ info['playlist']['url'] = concat_or_none(
+ 'https://www.youtube.com/playlist?list=',
+ info['playlist']['id'])
+ info['playlist']['video_count'] = playlist.get('totalVideos')
+ info['playlist']['current_index'] = playlist.get('currentIndex')
+ info['playlist']['items'] = [
+ extract_item_info(i) for i in playlist.get('contents', ())]
+ else:
+ info['playlist'] = None
+
+ # use dispatch table to get information scattered in various renderers
+ items, _ = extract_items(
+ response,
+ item_types=visible_extraction_dispatch.keys(),
+ search_engagement_panels=True
+ )
+ found = set()
+ for renderer in items:
+ name, renderer_content = list(renderer.items())[0]
+ found.add(name)
+ liberal_dict_update(
+ info,
+ visible_extraction_dispatch[name](renderer_content)
+ )
+ # Call the function on blank dict for any that weren't found
+ # so that the empty keys get added
+ for name in visible_extraction_dispatch.keys() - found:
+ liberal_dict_update(info, visible_extraction_dispatch[name]({}))
+
+ # comment section info
+ items, _ = extract_items(response, item_types={
+ 'commentSectionRenderer', 'commentsEntryPointHeaderRenderer'})
+ if items:
+ header_type = list(items[0])[0]
+ comment_info = items[0][header_type]
+ # This seems to be some kind of A/B test being done on mobile, where
+ # this is present instead of the normal commentSectionRenderer. It can
+ # be seen here:
+ # https://www.androidpolice.com/2019/10/31/google-youtube-app-comment-section-below-videos/
+ # https://www.youtube.com/watch?v=bR5Q-wD-6qo
+ if header_type == 'commentsEntryPointHeaderRenderer':
+ comment_count_text = extract_str(multi_get(
+ comment_info, 'commentCount', 'headerText'))
+ else:
+ comment_count_text = extract_str(deep_get(comment_info,
+ 'header', 'commentSectionHeaderRenderer', 'countText'))
+ if comment_count_text == 'Comments': # just this with no number, means 0 comments
+ info['comment_count'] = '0'
+ else:
+ info['comment_count'] = extract_approx_int(comment_count_text)
+ info['comments_disabled'] = False
+ else: # no comment section present means comments are disabled
+ info['comment_count'] = '0'
+ info['comments_disabled'] = True
+
+ # check for limited state
+ items, _ = extract_items(response, item_types={'limitedStateMessageRenderer'})
+ if items:
+ info['limited_state'] = True
+ else:
+ info['limited_state'] = False
+
+ # related videos
+ related, _ = extract_items(response)
+ info['related_videos'] = [extract_item_info(renderer) for renderer in related]
+
+ return info
+
+def _extract_watch_info_desktop(top_level):
+ info = {
+ 'comment_count': None,
+ 'comments_disabled': None,
+ 'limited_state': None,
+ 'playlist': None,
+ }
+
+ video_info = {}
+ for renderer in deep_get(top_level, 'response', 'contents', 'twoColumnWatchNextResults', 'results', 'results', 'contents', default=()):
+ if renderer and list(renderer.keys())[0] in ('videoPrimaryInfoRenderer', 'videoSecondaryInfoRenderer'):
+ video_info.update(list(renderer.values())[0])
+
+ info.update(_extract_metadata_row_info(video_info))
+ info['description'] = extract_str(video_info.get('description', None), recover_urls=True)
+ info['time_published'] = extract_date(extract_str(video_info.get('dateText', None)))
+
+ likes_dislikes = deep_get(video_info, 'sentimentBar', 'sentimentBarRenderer', 'tooltip', default='').split('/')
+ if len(likes_dislikes) == 2:
+ info['like_count'] = extract_int(likes_dislikes[0])
+ info['dislike_count'] = extract_int(likes_dislikes[1])
+ else:
+ info['like_count'] = None
+ info['dislike_count'] = None
+
+ info['title'] = extract_str(video_info.get('title', None))
+ info['author'] = extract_str(deep_get(video_info, 'owner', 'videoOwnerRenderer', 'title'))
+ info['author_id'] = deep_get(video_info, 'owner', 'videoOwnerRenderer', 'navigationEndpoint', 'browseEndpoint', 'browseId')
+ info['view_count'] = extract_int(extract_str(deep_get(video_info, 'viewCount', 'videoViewCountRenderer', 'viewCount')))
+
+ related = deep_get(top_level, 'response', 'contents', 'twoColumnWatchNextResults', 'secondaryResults', 'secondaryResults', 'results', default=[])
+ info['related_videos'] = [extract_item_info(renderer) for renderer in related]
+
+ return info
+
+def update_format_with_codec_info(fmt, codec):
+ if any(codec.startswith(c) for c in ('av', 'vp', 'h263', 'h264', 'mp4v')):
+ if codec == 'vp8.0':
+ codec = 'vp8'
+ conservative_update(fmt, 'vcodec', codec)
+ elif (codec.startswith('mp4a')
+ or codec in ('opus', 'mp3', 'aac', 'dtse', 'ec-3', 'vorbis',
+ 'ac-3')):
+ conservative_update(fmt, 'acodec', codec)
+ else:
+ print('Warning: unrecognized codec: ' + codec)
+
+fmt_type_re = re.compile(
+ r'(text|audio|video)/([\w0-9]+); codecs="([^"]+)"')
+def update_format_with_type_info(fmt, yt_fmt):
+ # 'type' for invidious api format
+ mime_type = multi_get(yt_fmt, 'mimeType', 'type')
+ if mime_type is None:
+ return
+ match = re.fullmatch(fmt_type_re, mime_type)
+ if match is None:
+ print('Warning: Could not read mimetype', mime_type)
+ return
+ type, fmt['ext'], codecs = match.groups()
+ codecs = codecs.split(', ')
+ for codec in codecs:
+ update_format_with_codec_info(fmt, codec)
+ if type == 'audio':
+ assert len(codecs) == 1
+
+def _extract_formats(info, player_response):
+ streaming_data = player_response.get('streamingData', {})
+ yt_formats = streaming_data.get('formats', []) + streaming_data.get('adaptiveFormats', [])
+
+ info['formats'] = []
+ # because we may retry the extract_formats with a different player_response
+ # so keep what we have
+ conservative_update(info, 'hls_manifest_url',
+ streaming_data.get('hlsManifestUrl'))
+ conservative_update(info, 'dash_manifest_url',
+ streaming_data.get('dash_manifest_url'))
+
+ for yt_fmt in yt_formats:
+ itag = yt_fmt.get('itag')
+
+ # Translated audio track
+ # Example: https://www.youtube.com/watch?v=gF9kkB0UWYQ
+ # Only get the original language for now so a foreign
+ # translation will not be picked just because it comes first
+ if deep_get(yt_fmt, 'audioTrack', 'audioIsDefault') is False:
+ continue
+
+ fmt = {}
+ fmt['itag'] = itag
+ fmt['ext'] = None
+ fmt['audio_bitrate'] = None
+ fmt['bitrate'] = yt_fmt.get('bitrate')
+ fmt['acodec'] = None
+ fmt['vcodec'] = None
+ fmt['width'] = yt_fmt.get('width')
+ fmt['height'] = yt_fmt.get('height')
+ fmt['file_size'] = extract_int(yt_fmt.get('contentLength'))
+ fmt['audio_sample_rate'] = extract_int(yt_fmt.get('audioSampleRate'))
+ fmt['duration_ms'] = yt_fmt.get('approxDurationMs')
+ fmt['fps'] = yt_fmt.get('fps')
+ fmt['init_range'] = yt_fmt.get('initRange')
+ fmt['index_range'] = yt_fmt.get('indexRange')
+ for key in ('init_range', 'index_range'):
+ if fmt[key]:
+ fmt[key]['start'] = int(fmt[key]['start'])
+ fmt[key]['end'] = int(fmt[key]['end'])
+ update_format_with_type_info(fmt, yt_fmt)
+ cipher = dict(urllib.parse.parse_qsl(multi_get(yt_fmt,
+ 'cipher', 'signatureCipher', default='')))
+ if cipher:
+ fmt['url'] = cipher.get('url')
+ else:
+ fmt['url'] = yt_fmt.get('url')
+ fmt['s'] = cipher.get('s')
+ fmt['sp'] = cipher.get('sp')
+
+ # update with information from big table
+ hardcoded_itag_info = _formats.get(str(itag), {})
+ for key, value in hardcoded_itag_info.items():
+ conservative_update(fmt, key, value) # prefer info from YouTube
+ fmt['quality'] = hardcoded_itag_info.get('height')
+ conservative_update(
+ fmt, 'quality',
+ extract_int(yt_fmt.get('quality'), whole_word=False)
+ )
+ conservative_update(
+ fmt, 'quality',
+ extract_int(yt_fmt.get('qualityLabel'), whole_word=False)
+ )
+
+ info['formats'].append(fmt)
+
+ # get ip address
+ if info['formats']:
+ query_string = (info['formats'][0].get('url') or '?').split('?')[1]
+ info['ip_address'] = deep_get(
+ urllib.parse.parse_qs(query_string), 'ip', 0)
+ else:
+ info['ip_address'] = None
+
+hls_regex = re.compile(r'[\w_-]+=(?:"[^"]+"|[^",]+),')
+def extract_hls_formats(hls_manifest):
+ '''returns hls_formats, err'''
+ hls_formats = []
+ try:
+ lines = hls_manifest.splitlines()
+ i = 0
+ while i < len(lines):
+ if lines[i].startswith('#EXT-X-STREAM-INF'):
+ fmt = {'acodec': None, 'vcodec': None, 'height': None,
+ 'width': None, 'fps': None, 'audio_bitrate': None,
+ 'itag': None, 'file_size': None, 'duration_ms': None,
+ 'audio_sample_rate': None, 'url': None}
+ properties = lines[i].split(':')[1]
+ properties += ',' # make regex work for last key-value pair
+
+ for pair in hls_regex.findall(properties):
+ key, value = pair.rstrip(',').split('=')
+ if key == 'CODECS':
+ for codec in value.strip('"').split(','):
+ update_format_with_codec_info(fmt, codec)
+ elif key == 'RESOLUTION':
+ fmt['width'], fmt['height'] = map(int, value.split('x'))
+ fmt['resolution'] = value
+ elif key == 'FRAME-RATE':
+ fmt['fps'] = int(value)
+ i += 1
+ fmt['url'] = lines[i]
+ assert fmt['url'].startswith('http')
+ fmt['ext'] = 'm3u8'
+ hls_formats.append(fmt)
+ i += 1
+ except Exception as e:
+ traceback.print_exc()
+ return [], str(e)
+ return hls_formats, None
+
+
+def _extract_playability_error(info, player_response, error_prefix=''):
+ if info['formats']:
+ info['playability_status'] = None
+ info['playability_error'] = None
+ return
+
+ playability_status = deep_get(player_response, 'playabilityStatus', 'status', default=None)
+ info['playability_status'] = playability_status
+
+ playability_reason = extract_str(multi_deep_get(player_response,
+ ['playabilityStatus', 'reason'],
+ ['playabilityStatus', 'errorScreen', 'playerErrorMessageRenderer', 'reason'],
+ default='Could not find playability error')
+ )
+
+ if playability_status not in (None, 'OK'):
+ info['playability_error'] = error_prefix + playability_reason
+ elif not info['playability_error']: # do not override
+ info['playability_error'] = error_prefix + 'Unknown playability error'
+
+SUBTITLE_FORMATS = ('srv1', 'srv2', 'srv3', 'ttml', 'vtt')
+def extract_watch_info(polymer_json):
+ info = {'playability_error': None, 'error': None,
+ 'player_response_missing': None}
+
+ if isinstance(polymer_json, dict):
+ top_level = polymer_json
+ elif isinstance(polymer_json, (list, tuple)):
+ top_level = {}
+ for page_part in polymer_json:
+ if not isinstance(page_part, dict):
+ return {'error': 'Invalid page part'}
+ top_level.update(page_part)
+ else:
+ return {'error': 'Invalid top level polymer data'}
+
+ error = check_missing_keys(top_level,
+ ['player', 'args'],
+ ['player', 'assets', 'js'],
+ ['playerResponse'],
+ )
+ if error:
+ info['playability_error'] = error
+
+ player_response = top_level.get('playerResponse', {})
+
+ # usually, only the embedded one has the urls
+ player_args = deep_get(top_level, 'player', 'args', default={})
+ if 'player_response' in player_args:
+ embedded_player_response = json.loads(player_args['player_response'])
+ else:
+ embedded_player_response = {}
+
+ # captions
+ info['automatic_caption_languages'] = []
+ info['manual_caption_languages'] = []
+ info['_manual_caption_language_names'] = {} # language name written in that language, needed in some cases to create the url
+ info['translation_languages'] = []
+ captions_info = player_response.get('captions', {})
+ info['_captions_base_url'] = normalize_url(deep_get(captions_info, 'playerCaptionsRenderer', 'baseUrl'))
+ # Sometimes the above playerCaptionsRender is randomly missing
+ # Extract base_url from one of the captions by removing lang specifiers
+ if not info['_captions_base_url']:
+ base_url = normalize_url(deep_get(
+ captions_info,
+ 'playerCaptionsTracklistRenderer',
+ 'captionTracks',
+ 0,
+ 'baseUrl'
+ ))
+ if base_url:
+ url_parts = urllib.parse.urlparse(base_url)
+ qs = urllib.parse.parse_qs(url_parts.query)
+ for key in ('tlang', 'lang', 'name', 'kind', 'fmt'):
+ if key in qs:
+ del qs[key]
+ base_url = urllib.parse.urlunparse(url_parts._replace(
+ query=urllib.parse.urlencode(qs, doseq=True)))
+ info['_captions_base_url'] = base_url
+ for caption_track in deep_get(captions_info, 'playerCaptionsTracklistRenderer', 'captionTracks', default=()):
+ lang_code = caption_track.get('languageCode')
+ if not lang_code:
+ continue
+ if caption_track.get('kind') == 'asr':
+ info['automatic_caption_languages'].append(lang_code)
+ else:
+ info['manual_caption_languages'].append(lang_code)
+ base_url = caption_track.get('baseUrl', '')
+ lang_name = deep_get(urllib.parse.parse_qs(urllib.parse.urlparse(base_url).query), 'name', 0)
+ if lang_name:
+ info['_manual_caption_language_names'][lang_code] = lang_name
+
+ for translation_lang_info in deep_get(captions_info, 'playerCaptionsTracklistRenderer', 'translationLanguages', default=()):
+ lang_code = translation_lang_info.get('languageCode')
+ if lang_code:
+ info['translation_languages'].append(lang_code)
+ if translation_lang_info.get('isTranslatable') == False:
+ print('WARNING: Found non-translatable caption language')
+
+ # formats
+ _extract_formats(info, embedded_player_response)
+ if not info['formats']:
+ _extract_formats(info, player_response)
+
+ # see https://github.com/user234683/youtube-local/issues/22#issuecomment-706395160
+ info['player_urls_missing'] = (
+ not info['formats'] and not embedded_player_response)
+
+ # playability errors
+ _extract_playability_error(info, player_response)
+
+ # check age-restriction
+ info['age_restricted'] = (info['playability_status'] == 'LOGIN_REQUIRED' and info['playability_error'] and ' age' in info['playability_error'])
+
+ # base_js (for decryption of signatures)
+ info['base_js'] = deep_get(top_level, 'player', 'assets', 'js')
+ if info['base_js']:
+ info['base_js'] = normalize_url(info['base_js'])
+ # must uniquely identify url
+ info['player_name'] = urllib.parse.urlparse(info['base_js']).path
+ else:
+ info['player_name'] = None
+
+ # extract stuff from visible parts of page
+ mobile = 'singleColumnWatchNextResults' in deep_get(top_level, 'response', 'contents', default={})
+ if mobile:
+ info.update(_extract_watch_info_mobile(top_level))
+ else:
+ info.update(_extract_watch_info_desktop(top_level))
+
+ # stuff from videoDetails. Use liberal_update to prioritize info from videoDetails over existing info
+ vd = deep_get(top_level, 'playerResponse', 'videoDetails', default={})
+ liberal_update(info, 'title', extract_str(vd.get('title')))
+ liberal_update(info, 'duration', extract_int(vd.get('lengthSeconds')))
+ liberal_update(info, 'view_count', extract_int(vd.get('viewCount')))
+ # videos with no description have a blank string
+ liberal_update(info, 'description', vd.get('shortDescription'))
+ liberal_update(info, 'id', vd.get('videoId'))
+ liberal_update(info, 'author', vd.get('author'))
+ liberal_update(info, 'author_id', vd.get('channelId'))
+ info['was_live'] = vd.get('isLiveContent')
+ conservative_update(info, 'unlisted', not vd.get('isCrawlable', True)) #isCrawlable is false on limited state videos even if they aren't unlisted
+ liberal_update(info, 'tags', vd.get('keywords', []))
+
+ # fallback stuff from microformat
+ mf = deep_get(top_level, 'playerResponse', 'microformat', 'playerMicroformatRenderer', default={})
+ conservative_update(info, 'title', extract_str(mf.get('title')))
+ conservative_update(info, 'duration', extract_int(mf.get('lengthSeconds')))
+ # this gives the view count for limited state videos
+ conservative_update(info, 'view_count', extract_int(mf.get('viewCount')))
+ conservative_update(info, 'description', extract_str(mf.get('description'), recover_urls=True))
+ conservative_update(info, 'author', mf.get('ownerChannelName'))
+ conservative_update(info, 'author_id', mf.get('externalChannelId'))
+ conservative_update(info, 'live', deep_get(mf, 'liveBroadcastDetails',
+ 'isLiveNow'))
+ liberal_update(info, 'unlisted', mf.get('isUnlisted'))
+ liberal_update(info, 'category', mf.get('category'))
+ liberal_update(info, 'time_published', mf.get('publishDate'))
+ liberal_update(info, 'time_uploaded', mf.get('uploadDate'))
+ family_safe = mf.get('isFamilySafe')
+ if family_safe is None:
+ conservative_update(info, 'age_restricted', None)
+ else:
+ conservative_update(info, 'age_restricted', not family_safe)
+ info['allowed_countries'] = mf.get('availableCountries', [])
+
+ # other stuff
+ info['author_url'] = 'https://www.youtube.com/channel/' + info['author_id'] if info['author_id'] else None
+ info['storyboard_spec_url'] = deep_get(player_response, 'storyboards', 'playerStoryboardSpecRenderer', 'spec')
+
+ return info
+
+single_char_codes = {
+ 'n': '\n',
+ '\\': '\\',
+ '"': '"',
+ "'": "'",
+ 'b': '\b',
+ 'f': '\f',
+ 'n': '\n',
+ 'r': '\r',
+ 't': '\t',
+ 'v': '\x0b',
+ '0': '\x00',
+ '\n': '', # backslash followed by literal newline joins lines
+}
+def js_escape_replace(match):
+ r'''Resolves javascript string escape sequences such as \x..'''
+ # some js-strings in the watch page html include them for no reason
+ # https://mathiasbynens.be/notes/javascript-escapes
+ escaped_sequence = match.group(1)
+ if escaped_sequence[0] in ('x', 'u'):
+ return chr(int(escaped_sequence[1:], base=16))
+
+ # In javascript, if it's not one of those escape codes, it's just the
+ # literal character. e.g., "\a" = "a"
+ return single_char_codes.get(escaped_sequence, escaped_sequence)
+
+# works but complicated and unsafe:
+#PLAYER_RESPONSE_RE = re.compile(r'<script[^>]*?>[^<]*?var ytInitialPlayerResponse = ({(?:"(?:[^"\\]|\\.)*?"|[^"])+?});')
+
+# Because there are sometimes additional statements after the json object
+# so we just capture all of those until end of script and tell json decoder
+# to ignore extra stuff after the json object
+PLAYER_RESPONSE_RE = re.compile(r'<script[^>]*?>[^<]*?var ytInitialPlayerResponse = ({.*?)</script>')
+INITIAL_DATA_RE = re.compile(r"<script[^>]*?>var ytInitialData = '(.+?[^\\])';")
+BASE_JS_RE = re.compile(r'jsUrl":\s*"([\w\-\./]+?/base.js)"')
+JS_STRING_ESCAPE_RE = re.compile(r'\\([^xu]|x..|u....)')
+def extract_watch_info_from_html(watch_html):
+ base_js_match = BASE_JS_RE.search(watch_html)
+ player_response_match = PLAYER_RESPONSE_RE.search(watch_html)
+ initial_data_match = INITIAL_DATA_RE.search(watch_html)
+
+ if base_js_match is not None:
+ base_js_url = base_js_match.group(1)
+ else:
+ base_js_url = None
+
+ if player_response_match is not None:
+ decoder = json.JSONDecoder()
+ # this will make it ignore extra stuff after end of object
+ player_response = decoder.raw_decode(player_response_match.group(1))[0]
+ else:
+ return {'error': 'Could not find ytInitialPlayerResponse'}
+ player_response = None
+
+ if initial_data_match is not None:
+ initial_data = initial_data_match.group(1)
+ initial_data = JS_STRING_ESCAPE_RE.sub(js_escape_replace, initial_data)
+ initial_data = json.loads(initial_data)
+ else:
+ print('extract_watch_info_from_html: failed to find initialData')
+ initial_data = None
+
+ # imitate old format expected by extract_watch_info
+ fake_polymer_json = {
+ 'player': {
+ 'args': {},
+ 'assets': {
+ 'js': base_js_url
+ }
+ },
+ 'playerResponse': player_response,
+ 'response': initial_data,
+ }
+
+ return extract_watch_info(fake_polymer_json)
+
+
+def captions_available(info):
+ return bool(info['_captions_base_url'])
+
+
+def get_caption_url(info, language, format, automatic=False, translation_language=None):
+ '''Gets the url for captions with the given language and format. If automatic is True, get the automatic captions for that language. If translation_language is given, translate the captions from `language` to `translation_language`. If automatic is true and translation_language is given, the automatic captions will be translated.'''
+ url = info['_captions_base_url']
+ if not url:
+ return None
+ url += '&lang=' + language
+ url += '&fmt=' + format
+ if automatic:
+ url += '&kind=asr'
+ elif language in info['_manual_caption_language_names']:
+ url += '&name=' + urllib.parse.quote(info['_manual_caption_language_names'][language], safe='')
+
+ if translation_language:
+ url += '&tlang=' + translation_language
+ return url
+
+def update_with_new_urls(info, player_response):
+ '''Inserts urls from player_response json'''
+ ERROR_PREFIX = 'Error getting missing player or bypassing age-restriction: '
+
+ try:
+ player_response = json.loads(player_response)
+ except json.decoder.JSONDecodeError:
+ traceback.print_exc()
+ info['playability_error'] = ERROR_PREFIX + 'Failed to parse json response'
+ return
+
+ _extract_formats(info, player_response)
+ _extract_playability_error(info, player_response, error_prefix=ERROR_PREFIX)
+
+def requires_decryption(info):
+ return ('formats' in info) and info['formats'] and info['formats'][0]['s']
+
+# adapted from youtube-dl and invidious:
+# https://github.com/omarroth/invidious/blob/master/src/invidious/helpers/signatures.cr
+decrypt_function_re = re.compile(r'function\(a\)\{(a=a\.split\(""\)[^\}{]+)return a\.join\(""\)\}')
+# gives us e.g. rt, .xK, 5 from rt.xK(a,5) or rt, ["xK"], 5 from rt["xK"](a,5)
+# (var, operation, argument)
+var_op_arg_re = re.compile(r'(\w+)(\.\w+|\["[^"]+"\])\(a,(\d+)\)')
+def extract_decryption_function(info, base_js):
+ '''Insert decryption function into info. Return error string if not successful.
+ Decryption function is a list of list[2] of numbers.
+ It is advisable to cache the decryption function (uniquely identified by info['player_name']) so base.js (1 MB) doesn't need to be redownloaded each time'''
+ info['decryption_function'] = None
+ decrypt_function_match = decrypt_function_re.search(base_js)
+ if decrypt_function_match is None:
+ return 'Could not find decryption function in base.js'
+
+ function_body = decrypt_function_match.group(1).split(';')[1:-1]
+ if not function_body:
+ return 'Empty decryption function body'
+
+ var_with_operation_match = var_op_arg_re.fullmatch(function_body[0])
+ if var_with_operation_match is None:
+ return 'Could not find var_name'
+
+ var_name = var_with_operation_match.group(1)
+ var_body_match = re.search(r'var ' + re.escape(var_name) + r'=\{(.*?)\};', base_js, flags=re.DOTALL)
+ if var_body_match is None:
+ return 'Could not find var_body'
+
+ operations = var_body_match.group(1).replace('\n', '').split('},')
+ if not operations:
+ return 'Did not find any definitions in var_body'
+ operations[-1] = operations[-1][:-1] # remove the trailing '}' since we split by '},' on the others
+ operation_definitions = {}
+ for op in operations:
+ colon_index = op.find(':')
+ opening_brace_index = op.find('{')
+
+ if colon_index == -1 or opening_brace_index == -1:
+ return 'Could not parse operation'
+ op_name = op[:colon_index]
+ op_body = op[opening_brace_index+1:]
+ if op_body == 'a.reverse()':
+ operation_definitions[op_name] = 0
+ elif op_body == 'a.splice(0,b)':
+ operation_definitions[op_name] = 1
+ elif op_body.startswith('var c=a[0]'):
+ operation_definitions[op_name] = 2
+ else:
+ return 'Unknown op_body: ' + op_body
+
+ decryption_function = []
+ for op_with_arg in function_body:
+ match = var_op_arg_re.fullmatch(op_with_arg)
+ if match is None:
+ return 'Could not parse operation with arg'
+ op_name = match.group(2).strip('[].')
+ if op_name not in operation_definitions:
+ return 'Unknown op_name: ' + str(op_name)
+ op_argument = match.group(3)
+ decryption_function.append([operation_definitions[op_name], int(op_argument)])
+
+ info['decryption_function'] = decryption_function
+ return False
+
+def _operation_2(a, b):
+ c = a[0]
+ a[0] = a[b % len(a)]
+ a[b % len(a)] = c
+
+def decrypt_signatures(info):
+ '''Applies info['decryption_function'] to decrypt all the signatures. Return err.'''
+ if not info.get('decryption_function'):
+ return 'decryption_function not in info'
+ for format in info['formats']:
+ if not format['s'] or not format['sp'] or not format['url']:
+ print('Warning: s, sp, or url not in format')
+ continue
+
+ a = list(format['s'])
+ for op, argument in info['decryption_function']:
+ if op == 0:
+ a.reverse()
+ elif op == 1:
+ a = a[argument:]
+ else:
+ _operation_2(a, argument)
+
+ signature = ''.join(a)
+ format['url'] += '&' + format['sp'] + '=' + signature
+ return False