The code first unzips the YouTube takeout data into a directory. Then, it uses the YouTube API to extract the metadata for each video in the directory. The metadata includes the video title, description, length, views, and likes. The code then saves the metadata to a CSV file.
"""
@author: Robert Vidigal, PhD (CSMaP-NYU)
"""
# Unzipping YouTube files
import shutil
import os
import zipfile
outdir = zipname.split(".")[0]
zipfile.ZipFile(zipname).extractall()
if os.path.exists("Takeout"):
os.rename("Takeout", outdir)
directory = pathlib.Path(outdir)
with zipfile.ZipFile(zipname, mode="w") as archive:
for file_path in directory.rglob("*"):
archive.write(file_path, compresslevel=9)
# clean local file
self.logger.info(f"Removing {zipname} from local machine")
shutil.rmtree(outdir)
os.remove(zipname)
### YouTube Data Takeout
from collections import Iterable
import datetime
import glob
import os
import requests
import pandas as pd
from tqdm import tqdm
from youtube_transcript_api import YouTubeTranscriptApi
# Get videoID from video URL
def strip_video_id_from_url(url):
'''Strips the video_id from YouTube URL.'''
if not url:
return None
if 'watch?v=' in url.lower():
url_ = url[url.find('watch?v='):].replace('watch?v=', '')[:11]
elif ';v=' in url.lower():
url_ = url[url.find(';v='):].replace(';v=', '')[:11]
elif 'v=' in url.lower():
url_ = url[url.find('v='):].replace('v=', '')[:11]
elif 'embed/' in url.lower():
url_ = url[url.find('embed/'):].replace('embed/', '')[:11]
elif 'youtu.be' in url.lower():
try:
url_ = url[url.rindex('/') + 1:]
except:
return None
else:
url_ = None
return url_
data = sorted(glob.glob('---PATH---/*.tsv.gz'))
df = pd.concat([pd.read_csv(f, compression='gzip', sep='\t') for f in tqdm(data)])
df.shape
df = df[(df['page_domain'] == 'www.youtube.com')
& (df['page_url'] != 'http://www.youtube.com:443')][['caseid', 'session_start_time', 'start_time_utc',
'date', 'time', 'page_domain', 'page_url', 'predecessor_url',
'succesor_url', 'page_duration']].copy()
# Parsing video metadata
def parse_video_metadata(item):
if not isinstance(item, dict):
return dict()
tags = item["snippet"].get('tags')
if isinstance(tags, Iterable):
video_tags = '|'.join(tags)
else:
video_tags = ''
video_meta = {
"video_id" : item['id'],
"channel_title" : item["snippet"].get("channelTitle"),
"channel_id" : item["snippet"].get("channelId"),
"video_publish_date" : parse_yt_datetime(item["snippet"].get("publishedAt")),
"video_title" : item["snippet"].get("title"),
"video_description" : item["snippet"].get("description"),
"video_category" : item["snippet"].get("categoryId"),
"video_view_count" : item["statistics"].get("viewCount"),
"video_comment_count" : item["statistics"].get("commentCount"),
"video_like_count" : item["statistics"].get("likeCount"),
"video_dislike_count" : item["statistics"].get("dislikeCount"),
"video_thumbnail" : item["snippet"]["thumbnails"]["high"]["url"],
"video_tags" : video_tags,
"collection_date" : datetime.datetime.timestamp(datetime.datetime.utcnow().replace(microsecond=0))
}
return video_meta
# Getting video metadata from YouTube API
def get_video_metadata(video_id, api_key, parser=parse_video_metadata, part=['statistics','snippet']):
video_metadata = []
if isinstance(video_id, str):
part = ','.join(part)
http_endpoint = ("https://www.googleapis.com/youtube/v3/videos"
"?part={}"
"&id={}&key={}&maxResults=2".format(part, video_id, api_key))
response_json = requests.get(http_endpoint).json()
if response_json.get('items'):
video_metadata = parser(response_json['items'][0])
elif isinstance(video_id, list) or isinstance(video_id, pd.Series):
part = ','.join(part)
for chunk in tqdm(_chunker(video_id, chunksize=25)):
video_id = ','.join(chunk)
http_endpoint = ("https://www.googleapis.com/youtube/v3/videos"
"?part={}&key={}"
"&id={}".format(part, api_key, video_id))
response_json = requests.get(http_endpoint)
response_json.raise_for_status()
response_json = response_json.json()
if response_json.get('items'):
for item in response_json.get('items'):
video_meta = parser(item)
video_metadata.append(video_meta)
else:
raise TypeError("Could not process the type entered!")
return video_metadata