diff options
author | Biswakalyan Bhuyan <biswa@surgot.in> | 2024-09-19 15:33:11 +0530 |
---|---|---|
committer | Biswakalyan Bhuyan <biswa@surgot.in> | 2024-09-19 15:33:11 +0530 |
commit | a4e01da27c08e43a67b2618ad1e71c1f8f86d5cd (patch) | |
tree | 5b8f407dbb7e9d1ab2106ac0cc8564897e7a2098 /youtube/yt_data_extract/common.py | |
download | yt-local-a4e01da27c08e43a67b2618ad1e71c1f8f86d5cd.tar.gz yt-local-a4e01da27c08e43a67b2618ad1e71c1f8f86d5cd.tar.bz2 yt-local-a4e01da27c08e43a67b2618ad1e71c1f8f86d5cd.zip |
Diffstat (limited to 'youtube/yt_data_extract/common.py')
-rw-r--r-- | youtube/yt_data_extract/common.py | 610 |
1 files changed, 610 insertions, 0 deletions
diff --git a/youtube/yt_data_extract/common.py b/youtube/yt_data_extract/common.py new file mode 100644 index 0000000..7903db5 --- /dev/null +++ b/youtube/yt_data_extract/common.py @@ -0,0 +1,610 @@ +import re +import urllib.parse +import collections +import collections.abc + +def get(object, key, default=None, types=()): + '''Like dict.get(), but returns default if the result doesn't match one of the types. + Also works for indexing lists.''' + try: + result = object[key] + except (TypeError, IndexError, KeyError): + return default + + if not types or isinstance(result, types): + return result + else: + return default + +def multi_get(object, *keys, default=None, types=()): + '''Like get, but try other keys if the first fails''' + for key in keys: + try: + result = object[key] + except (TypeError, IndexError, KeyError): + pass + else: + if not types or isinstance(result, types): + return result + else: + continue + return default + + +def deep_get(object, *keys, default=None, types=()): + '''Like dict.get(), but for nested dictionaries/sequences, supporting keys or indices. + Last argument is the default value to use in case of any IndexErrors or KeyErrors. + If types is given and the result doesn't match one of those types, default is returned''' + try: + for key in keys: + object = object[key] + except (TypeError, IndexError, KeyError): + return default + else: + if not types or isinstance(object, types): + return object + else: + return default + +def multi_deep_get(object, *key_sequences, default=None, types=()): + '''Like deep_get, but can try different key sequences in case one fails. + Return default if all of them fail. key_sequences is a list of lists''' + for key_sequence in key_sequences: + _object = object + try: + for key in key_sequence: + _object = _object[key] + except (TypeError, IndexError, KeyError): + pass + else: + if not types or isinstance(_object, types): + return _object + else: + continue + return default + + +def _is_empty(value): + '''Determines if value is None or an empty iterable, such as '' and []''' + if value is None: + return True + elif isinstance(value, collections.abc.Iterable) and not value: + return True + return False + + +def liberal_update(obj, key, value): + '''Updates obj[key] with value as long as value is not None or empty. + Ensures obj[key] will at least get an empty value, however''' + if (not _is_empty(value)) or (key not in obj): + obj[key] = value + +def conservative_update(obj, key, value): + '''Only updates obj if it doesn't have key or obj[key] is None/empty''' + if _is_empty(obj.get(key)): + obj[key] = value + + +def liberal_dict_update(dict1, dict2): + '''Update dict1 with keys from dict2 using liberal_update''' + for key, value in dict2.items(): + liberal_update(dict1, key, value) + + +def conservative_dict_update(dict1, dict2): + '''Update dict1 with keys from dict2 using conservative_update''' + for key, value in dict2.items(): + conservative_update(dict1, key, value) + + +def concat_or_none(*strings): + '''Concatenates strings. Returns None if any of the arguments are None''' + result = '' + for string in strings: + if string is None: + return None + result += string + return result + +def remove_redirect(url): + if url is None: + return None + if re.fullmatch(r'(((https?:)?//)?(www.)?youtube.com)?/redirect\?.*', url) is not None: # YouTube puts these on external links to do tracking + query_string = url[url.find('?')+1: ] + return urllib.parse.parse_qs(query_string)['q'][0] + return url + +norm_url_re = re.compile(r'^(?:(?:https?:)?//)?((?:[\w-]+\.)+[\w-]+)?(/.*)$') +def normalize_url(url): + '''Insert https, resolve relative paths for youtube.com, and put www. infront of youtube.com''' + if url is None: + return None + match = norm_url_re.fullmatch(url) + if match is None: + raise Exception(url) + + domain = match.group(1) or 'www.youtube.com' + if domain == 'youtube.com': + domain = 'www.youtube.com' + + return 'https://' + domain + match.group(2) + +def _recover_urls(runs): + for run in runs: + url = deep_get(run, 'navigationEndpoint', 'urlEndpoint', 'url') + text = run.get('text', '') + # second condition is necessary because YouTube makes other things into urls, such as hashtags, which we want to keep as text + if url is not None and (text.startswith('http://') or text.startswith('https://')): + url = remove_redirect(url) + run['url'] = url + run['text'] = url # YouTube truncates the url text, use actual url instead + +def extract_str(node, default=None, recover_urls=False): + '''default is the value returned if the extraction fails. If recover_urls is true, will attempt to fix YouTube's truncation of url text (most prominently seen in descriptions)''' + if isinstance(node, str): + return node + + try: + return node['simpleText'] + except (KeyError, TypeError): + pass + + if isinstance(node, dict) and 'runs' in node: + if recover_urls: + _recover_urls(node['runs']) + return ''.join(text_run.get('text', '') for text_run in node['runs']) + + return default + +def extract_formatted_text(node): + if not node: + return [] + if 'runs' in node: + _recover_urls(node['runs']) + return node['runs'] + elif 'simpleText' in node: + return [{'text': node['simpleText']}] + return [] + +def extract_int(string, default=None, whole_word=True): + if isinstance(string, int): + return string + if not isinstance(string, str): + string = extract_str(string) + if not string: + return default + if whole_word: + match = re.search(r'\b(\d+)\b', string.replace(',', '')) + else: + match = re.search(r'(\d+)', string.replace(',', '')) + if match is None: + return default + try: + return int(match.group(1)) + except ValueError: + return default + +def extract_approx_int(string): + '''e.g. "15.1M" from "15.1M subscribers" or '4,353' from 4353''' + if not isinstance(string, str): + string = extract_str(string) + if not string: + return None + match = re.search(r'\b(\d+(?:\.\d+)?[KMBTkmbt]?)\b', string.replace(',', '')) + if match is None: + return None + result = match.group(1) + if re.fullmatch(r'\d+', result): + result = '{:,}'.format(int(result)) + return result + +MONTH_ABBREVIATIONS = {'jan':'1', 'feb':'2', 'mar':'3', 'apr':'4', 'may':'5', 'jun':'6', 'jul':'7', 'aug':'8', 'sep':'9', 'oct':'10', 'nov':'11', 'dec':'12'} +def extract_date(date_text): + '''Input: "Mar 9, 2019". Output: "2019-3-9"''' + if not isinstance(date_text, str): + date_text = extract_str(date_text) + if date_text is None: + return None + + date_text = date_text.replace(',', '').lower() + parts = date_text.split() + if len(parts) >= 3: + month, day, year = parts[-3:] + month = MONTH_ABBREVIATIONS.get(month[0:3]) # slicing in case they start writing out the full month name + if month and (re.fullmatch(r'\d\d?', day) is not None) and (re.fullmatch(r'\d{4}', year) is not None): + return year + '-' + month + '-' + day + return None + +def check_missing_keys(object, *key_sequences): + for key_sequence in key_sequences: + _object = object + try: + for key in key_sequence: + _object = _object[key] + except (KeyError, IndexError, TypeError): + return 'Could not find ' + key + + return None + +def extract_item_info(item, additional_info={}): + if not item: + return {'error': 'No item given'} + + type = get(list(item.keys()), 0) + if not type: + return {'error': 'Could not find type'} + item = item[type] + + info = {'error': None} + if type in ('itemSectionRenderer', 'compactAutoplayRenderer'): + return extract_item_info(deep_get(item, 'contents', 0), additional_info) + + if type in ('movieRenderer', 'clarificationRenderer'): + info['type'] = 'unsupported' + return info + + # type looks like e.g. 'compactVideoRenderer' or 'gridVideoRenderer' + # camelCase split, https://stackoverflow.com/a/37697078 + type_parts = [s.lower() for s in re.sub(r'([A-Z][a-z]+)', r' \1', type).split()] + if len(type_parts) < 2: + info['type'] = 'unsupported' + return + primary_type = type_parts[-2] + if primary_type == 'video': + info['type'] = 'video' + elif type_parts[0] == 'reel': # shorts + info['type'] = 'video' + primary_type = 'video' + elif primary_type in ('playlist', 'radio', 'show'): + info['type'] = 'playlist' + info['playlist_type'] = primary_type + elif primary_type == 'channel': + info['type'] = 'channel' + elif type == 'videoWithContextRenderer': # stupid exception + info['type'] = 'video' + primary_type = 'video' + else: + info['type'] = 'unsupported' + + # videoWithContextRenderer changes it to 'headline' just to be annoying + info['title'] = extract_str(multi_get(item, 'title', 'headline')) + if primary_type != 'channel': + info['author'] = extract_str(multi_get(item, 'longBylineText', 'shortBylineText', 'ownerText')) + info['author_id'] = extract_str(multi_deep_get(item, + ['longBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], + ['shortBylineText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'], + ['ownerText', 'runs', 0, 'navigationEndpoint', 'browseEndpoint', 'browseId'] + )) + info['author_url'] = ('https://www.youtube.com/channel/' + info['author_id']) if info['author_id'] else None + info['description'] = extract_formatted_text(multi_deep_get( + item, + ['descriptionText'], ['descriptionSnippet'], + ['detailedMetadataSnippets', 0, 'snippetText'], + )) + info['thumbnail'] = normalize_url(multi_deep_get(item, + ['thumbnail', 'thumbnails', 0, 'url'], # videos + ['thumbnails', 0, 'thumbnails', 0, 'url'], # playlists + ['thumbnailRenderer', 'showCustomThumbnailRenderer', 'thumbnail', 'thumbnails', 0, 'url'], # shows + )) + + info['badges'] = [] + for badge_node in multi_get(item, 'badges', 'ownerBadges', default=()): + badge = deep_get(badge_node, 'metadataBadgeRenderer', 'label') + if badge: + info['badges'].append(badge) + + if primary_type in ('video', 'playlist'): + info['time_published'] = None + timestamp = re.search(r'(\d+ \w+ ago)', + extract_str(item.get('publishedTimeText'), default='')) + if timestamp: + info['time_published'] = timestamp.group(1) + + if primary_type == 'video': + info['id'] = multi_deep_get(item, + ['videoId'], + ['navigationEndpoint', 'watchEndpoint', 'videoId'], + ['navigationEndpoint', 'reelWatchEndpoint', 'videoId'] # shorts + ) + info['view_count'] = extract_int(item.get('viewCountText')) + + # dig into accessibility data to get view_count for videos marked as recommended, and to get time_published + accessibility_label = multi_deep_get(item, + ['title', 'accessibility', 'accessibilityData', 'label'], + ['headline', 'accessibility', 'accessibilityData', 'label'], + default='') + timestamp = re.search(r'(\d+ \w+ ago)', accessibility_label) + if timestamp: + conservative_update(info, 'time_published', timestamp.group(1)) + view_count = re.search(r'(\d+) views', accessibility_label.replace(',', '')) + if view_count: + conservative_update(info, 'view_count', int(view_count.group(1))) + + if info['view_count']: + info['approx_view_count'] = '{:,}'.format(info['view_count']) + else: + info['approx_view_count'] = extract_approx_int(multi_get(item, + 'shortViewCountText', + 'viewCountText' # shorts + )) + + # handle case where it is "No views" + if not info['approx_view_count']: + if ('No views' in item.get('shortViewCountText', '') + or 'no views' in accessibility_label.lower() + or 'No views' in extract_str(item.get('viewCountText', '')) # shorts + ): + info['view_count'] = 0 + info['approx_view_count'] = '0' + + info['duration'] = extract_str(item.get('lengthText')) + + # dig into accessibility data to get duration for shorts + accessibility_label = deep_get(item, + 'accessibility', 'accessibilityData', 'label', + default='') + duration = re.search(r'(\d+) (second|seconds|minute) - play video$', + accessibility_label) + if duration: + if duration.group(2) == 'minute': + conservative_update(info, 'duration', '1:00') + else: + conservative_update(info, + 'duration', '0:' + duration.group(1).zfill(2)) + + # if it's an item in a playlist, get its index + if 'index' in item: # url has wrong index on playlist page + info['index'] = extract_int(item.get('index')) + elif 'indexText' in item: + # Current item in playlist has ▶ instead of the actual index, must + # dig into url + match = re.search(r'index=(\d+)', deep_get(item, + 'navigationEndpoint', 'commandMetadata', 'webCommandMetadata', + 'url', default='')) + if match is None: # worth a try then + info['index'] = extract_int(item.get('indexText')) + else: + info['index'] = int(match.group(1)) + else: + info['index'] = None + + elif primary_type in ('playlist', 'radio'): + info['id'] = item.get('playlistId') + info['video_count'] = extract_int(item.get('videoCount')) + info['first_video_id'] = deep_get(item, 'navigationEndpoint', + 'watchEndpoint', 'videoId') + elif primary_type == 'channel': + info['id'] = item.get('channelId') + info['approx_subscriber_count'] = extract_approx_int(item.get('subscriberCountText')) + elif primary_type == 'show': + info['id'] = deep_get(item, 'navigationEndpoint', 'watchEndpoint', 'playlistId') + info['first_video_id'] = deep_get(item, 'navigationEndpoint', + 'watchEndpoint', 'videoId') + + if primary_type in ('playlist', 'channel'): + conservative_update(info, 'video_count', extract_int(item.get('videoCountText'))) + + for overlay in item.get('thumbnailOverlays', []): + conservative_update(info, 'duration', extract_str(deep_get( + overlay, 'thumbnailOverlayTimeStatusRenderer', 'text' + ))) + # show renderers don't have videoCountText + conservative_update(info, 'video_count', extract_int(deep_get( + overlay, 'thumbnailOverlayBottomPanelRenderer', 'text' + ))) + + info.update(additional_info) + + return info + +def extract_response(polymer_json): + '''return response, error''' + # /youtubei/v1/browse endpoint returns response directly + if isinstance(polymer_json, dict) and 'responseContext' in polymer_json: + # this is the response + return polymer_json, None + + response = multi_deep_get(polymer_json, [1, 'response'], ['response']) + if response is None: + return None, 'Failed to extract response' + else: + return response, None + + +_item_types = { + 'movieRenderer', + 'didYouMeanRenderer', + 'showingResultsForRenderer', + + 'videoRenderer', + 'compactVideoRenderer', + 'compactAutoplayRenderer', + 'videoWithContextRenderer', + 'gridVideoRenderer', + 'playlistVideoRenderer', + + 'reelItemRenderer', + + 'playlistRenderer', + 'compactPlaylistRenderer', + 'gridPlaylistRenderer', + + 'radioRenderer', + 'compactRadioRenderer', + 'gridRadioRenderer', + + 'showRenderer', + 'compactShowRenderer', + 'gridShowRenderer', + + + 'channelRenderer', + 'compactChannelRenderer', + 'gridChannelRenderer', +} + +def _traverse_browse_renderer(renderer): + for tab in get(renderer, 'tabs', ()): + tab_renderer = multi_get(tab, 'tabRenderer', 'expandableTabRenderer') + if tab_renderer is None: + continue + if tab_renderer.get('selected', False): + return get(tab_renderer, 'content', {}) + print('Could not find tab with content') + return {} + +def _traverse_standard_list(renderer): + renderer_list = multi_get(renderer, 'contents', 'items', default=()) + continuation = deep_get(renderer, 'continuations', 0, 'nextContinuationData', 'continuation') + return renderer_list, continuation + +# these renderers contain one inside them +nested_renderer_dispatch = { + 'singleColumnBrowseResultsRenderer': _traverse_browse_renderer, + 'twoColumnBrowseResultsRenderer': _traverse_browse_renderer, + 'twoColumnSearchResultsRenderer': lambda r: get(r, 'primaryContents', {}), + 'richItemRenderer': lambda r: get(r, 'content', {}), + 'engagementPanelSectionListRenderer': lambda r: get(r, 'content', {}), +} + +# these renderers contain a list of renderers inside them +nested_renderer_list_dispatch = { + 'sectionListRenderer': _traverse_standard_list, + 'itemSectionRenderer': _traverse_standard_list, + 'gridRenderer': _traverse_standard_list, + 'richGridRenderer': _traverse_standard_list, + 'playlistVideoListRenderer': _traverse_standard_list, + 'structuredDescriptionContentRenderer': _traverse_standard_list, + 'slimVideoMetadataSectionRenderer': _traverse_standard_list, + 'singleColumnWatchNextResults': lambda r: (deep_get(r, 'results', 'results', 'contents', default=[]), None), +} +def get_nested_renderer_list_function(key): + if key in nested_renderer_list_dispatch: + return nested_renderer_list_dispatch[key] + elif key.endswith('Continuation'): + return _traverse_standard_list + return None + +def extract_items_from_renderer(renderer, item_types=_item_types): + ctoken = None + items = [] + + iter_stack = collections.deque() + current_iter = iter(()) + + while True: + # mode 1: get a new renderer by iterating. + # goes down the stack for an iterator if one has been exhausted + if not renderer: + try: + renderer = current_iter.__next__() + except StopIteration: + try: + current_iter = iter_stack.pop() + except IndexError: + return items, ctoken + # Get new renderer or check that the one we got is good before + # proceeding to mode 2 + continue + + + # mode 2: dig into the current renderer + key, value = list(renderer.items())[0] + + # the renderer is an item + if key in item_types: + items.append(renderer) + + # ctoken sometimes placed in these renderers, e.g. channel playlists + elif key == 'continuationItemRenderer': + cont = deep_get( + value, 'continuationEndpoint', 'continuationCommand', 'token' + ) + if cont: + ctoken = cont + + # has a list in it, add it to the iter stack + elif get_nested_renderer_list_function(key): + renderer_list, cont = get_nested_renderer_list_function(key)(value) + if renderer_list: + iter_stack.append(current_iter) + current_iter = iter(renderer_list) + if cont: + ctoken = cont + + # new renderer nested inside this one + elif key in nested_renderer_dispatch: + renderer = nested_renderer_dispatch[key](value) + continue # don't reset renderer to None + + renderer = None + + +def extract_items_from_renderer_list(renderers, item_types=_item_types): + '''Same as extract_items_from_renderer, but provide a list of renderers''' + items = [] + ctoken = None + for renderer in renderers: + new_items, new_ctoken = extract_items_from_renderer( + renderer, + item_types=item_types) + items += new_items + # prioritize ctoken associated with items + if (not ctoken) or (new_ctoken and new_items): + ctoken = new_ctoken + return items, ctoken + + +def extract_items(response, item_types=_item_types, + search_engagement_panels=False): + '''return items, ctoken''' + items = [] + ctoken = None + if 'continuationContents' in response: + # sometimes there's another, empty, junk [something]Continuation key + # find real one + for key, renderer_cont in get(response, + 'continuationContents', {}).items(): + # e.g. commentSectionContinuation, playlistVideoListContinuation + if key.endswith('Continuation'): + items, ctoken = extract_items_from_renderer( + {key: renderer_cont}, + item_types=item_types) + if items: + break + if ('onResponseReceivedEndpoints' in response + or 'onResponseReceivedActions' in response): + for endpoint in multi_get(response, + 'onResponseReceivedEndpoints', + 'onResponseReceivedActions', + []): + new_items, new_ctoken = extract_items_from_renderer_list( + multi_deep_get( + endpoint, + ['reloadContinuationItemsCommand', 'continuationItems'], + ['appendContinuationItemsAction', 'continuationItems'], + default=[] + ), + item_types=item_types, + ) + items += new_items + if (not ctoken) or (new_ctoken and new_items): + ctoken = new_ctoken + if 'contents' in response: + renderer = get(response, 'contents', {}) + new_items, new_ctoken = extract_items_from_renderer( + renderer, + item_types=item_types) + items += new_items + if (not ctoken) or (new_ctoken and new_items): + ctoken = new_ctoken + + if search_engagement_panels and 'engagementPanels' in response: + new_items, new_ctoken = extract_items_from_renderer_list( + response['engagementPanels'], item_types=item_types + ) + items += new_items + if (not ctoken) or (new_ctoken and new_items): + ctoken = new_ctoken + + return items, ctoken |