From 71168a29b366c25f762a0ae8dde2a95c012f67a7 Mon Sep 17 00:00:00 2001 From: Robin Universe Date: Mon, 17 Jan 2022 17:36:43 -0600 Subject: [PATCH] Added /dl/ and .mp4 endpoints These still have some hard-coded paths built in, so take care to fix those before using --- twitfix.py | 140 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 106 insertions(+), 34 deletions(-) diff --git a/twitfix.py b/twitfix.py index d5dc42f..0790b05 100644 --- a/twitfix.py +++ b/twitfix.py @@ -1,12 +1,14 @@ -from flask import Flask, render_template, request, redirect, Response +from flask import Flask, render_template, request, redirect, Response, send_from_directory, url_for, send_file, make_response import youtube_dl import textwrap import twitter import pymongo +import requests import json import re import os import urllib.parse +import urllib.request app = Flask(__name__) pathregex = re.compile("\\w{1,15}\\/(status|statuses)\\/\\d{2,20}") @@ -47,13 +49,13 @@ elif link_cache_system == "db": @app.route('/latest/') # Try to return the latest video def latest(): - vnf = db.linkCache.find_one(sort = [('_id', pymongo.DESCENDING)]) - desc = re.sub(r' http.*t\.co\S+', '', vnf['description']) - urlUser = urllib.parse.quote(vnf['uploader']) - urlDesc = urllib.parse.quote(desc) - urlLink = urllib.parse.quote(vnf['url']) - print(" [ ✔ ] Latest video page loaded: " + vnf['tweet'] ) - return render_template('inline.html', vidlink=vnf['url'], vidurl=vnf['url'], desc=desc, pic=vnf['thumbnail'], user=vnf['uploader'], video_link=vnf['url'], color=config['config']['color'], appname=config['config']['appname'], repo=config['config']['repo'], url=config['config']['url'], urlDesc=urlDesc, urlUser=urlUser, urlLink=urlLink, tweet=vnf['tweet']) + vnf = db.linkCache.find_one(sort = [('_id', pymongo.DESCENDING)]) + desc = re.sub(r' http.*t\.co\S+', '', vnf['description']) + urlUser = urllib.parse.quote(vnf['uploader']) + urlDesc = urllib.parse.quote(desc) + urlLink = urllib.parse.quote(vnf['url']) + print(" ➤ [ ✔ ] Latest video page loaded: " + vnf['tweet'] ) + return render_template('inline.html', vidlink=vnf['url'], vidurl=vnf['url'], desc=desc, pic=vnf['thumbnail'], user=vnf['uploader'], video_link=vnf['url'], color=config['config']['color'], appname=config['config']['appname'], repo=config['config']['repo'], url=config['config']['url'], urlDesc=urlDesc, urlUser=urlUser, urlLink=urlLink, tweet=vnf['tweet']) @app.route('/') # If the useragent is discord, return the embed, if not, redirect to configured repo directly def default(): @@ -63,19 +65,35 @@ def default(): else: return redirect(config['config']['repo'], 301) -@app.route('/oembed.json') +@app.route('/oembed.json') #oEmbed endpoint def oembedend(): desc = request.args.get("desc", None) user = request.args.get("user", None) link = request.args.get("link", None) return o_embed_gen(desc,user,link) -@app.route('/') +@app.route('/') # Default endpoint used by everything def twitfix(sub_path): user_agent = request.headers.get('user-agent') match = pathregex.search(sub_path) - if sub_path.endswith(".mp4"): - return dir(sub_path.replace(".mp4","")) + + if request.url.startswith("https://d.fx"): # Matches d.fx? Try to give the user a direct link + if user_agent in generate_embed_user_agents: + print( " ➤ [ D ] d.fx link shown to discord user-agent!") + if request.url.endswith(".mp4") and "?" not in request.url: + return dl(sub_path) + else: + return message("To use a direct MP4 link in discord, remove anything past '?' and put '.mp4' at the end") + else: + print(" ➤ [ R ] Redirect to MP4 using d.fxtwitter.com") + return dir(sub_path) + + elif sub_path.endswith(".mp4"): + if "?" not in request.url: + return dl(sub_path) + else: + return message("To use a direct MP4 link in discord, remove anything past '?' and put '.mp4' at the end") + if match is not None: twitter_url = sub_path @@ -87,7 +105,7 @@ def twitfix(sub_path): return res else: - print(" [ R ] Redirect to " + twitter_url) + print(" ➤ [ R ] Redirect to " + twitter_url) return redirect(twitter_url, 301) else: return message("This doesn't appear to be a twitter URL") @@ -95,20 +113,49 @@ def twitfix(sub_path): @app.route('/other/') # Show all info that Youtube-DL can get about a video as a json def other(sub_path): otherurl = request.url.split("/other/", 1)[1].replace(":/","://") - print("[ OTHER ] Other URL embed attempted: " + otherurl) + print(" ➤ [ OTHER ] Other URL embed attempted: " + otherurl) res = embed_video(otherurl) return res @app.route('/info/') # Show all info that Youtube-DL can get about a video as a json def info(sub_path): infourl = request.url.split("/info/", 1)[1].replace(":/","://") - print("[ INFO ] Info data requested: " + infourl) + print(" ➤ [ INFO ] Info data requested: " + infourl) with youtube_dl.YoutubeDL({'outtmpl': '%(id)s.%(ext)s'}) as ydl: result = ydl.extract_info(infourl, download=False) return result -@app.route('/dir/') +@app.route('/dl/') # Download the tweets video, and rehost it +def dl(sub_path): + print(' ➤ [[ !!! TRYING TO DOWNLOAD FILE !!! ]] Downloading file from ' + sub_path) + url = sub_path + match = pathregex.search(url) + if match is not None: + twitter_url = url + if match.start() == 0: + twitter_url = "https://twitter.com/" + url + + mp4link = direct_video_link(twitter_url) + filename = (sub_path.split('/')[-1].split('.mp4')[0] + '.mp4') + + PATH = ( './static/' + filename ) + if os.path.isfile(PATH) and os.access(PATH, os.R_OK): + print(" ➤ [[ FILE EXISTS ]]") + else: + print(" ➤ [[ FILE DOES NOT EXIST, DOWNLOADING... ]]") + mp4file = urllib.request.urlopen(mp4link) + with open(('/home/robin/twitfix/static/' + filename), 'wb') as output: + output.write(mp4file.read()) + + print(' ➤ [[ PRESENTING FILE: '+ filename +', URL: https://fxtwitter.com/static/'+ filename +' ]]') + r = make_response(send_file(('static/' + filename), mimetype='video/mp4', max_age=100)) + r.headers['Content-Type'] = 'video/mp4' + r.headers['Sec-Fetch-Site'] = 'none' + r.headers['Sec-Fetch-User'] = '?1' + return r + +@app.route('/dir/') # Try to return a direct link to the MP4 on twitters servers def dir(sub_path): user_agent = request.headers.get('user-agent') url = sub_path @@ -124,11 +171,16 @@ def dir(sub_path): return res else: - print(" [ R ] Redirect to direct MP4 URL") + print(" ➤ [ R ] Redirect to direct MP4 URL") return direct_video(twitter_url) else: return redirect(url, 301) +@app.route('/favicon.ico') # This shit don't work +def favicon(): + return send_from_directory(os.path.join(app.root_path, 'static'), + 'favicon.ico',mimetype='image/vnd.microsoft.icon') + def direct_video(video_link): # Just get a redirect to a MP4 link from any tweet link cached_vnf = get_vnf_from_link_cache(video_link) if cached_vnf == None: @@ -136,13 +188,28 @@ def direct_video(video_link): # Just get a redirect to a MP4 link from any tweet vnf = link_to_vnf(video_link) add_vnf_to_link_cache(video_link, vnf) return redirect(vnf['url'], 301) - print(" [ D ] Redirecting to direct URL: " + vnf['url']) + print(" ➤ [ D ] Redirecting to direct URL: " + vnf['url']) except Exception as e: print(e) return message("Failed to scan your link!") else: return redirect(cached_vnf['url'], 301) - print(" [ D ] Redirecting to direct URL: " + vnf['url']) + print(" ➤ [ D ] Redirecting to direct URL: " + vnf['url']) + +def direct_video_link(video_link): # Just get a redirect to a MP4 link from any tweet link + cached_vnf = get_vnf_from_link_cache(video_link) + if cached_vnf == None: + try: + vnf = link_to_vnf(video_link) + add_vnf_to_link_cache(video_link, vnf) + return vnf['url'] + print(" ➤ [ D ] Redirecting to direct URL: " + vnf['url']) + except Exception as e: + print(e) + return message("Failed to scan your link!") + else: + return cached_vnf['url'] + print(" ➤ [ D ] Redirecting to direct URL: " + vnf['url']) def embed_video(video_link): # Return Embed from any tweet link cached_vnf = get_vnf_from_link_cache(video_link) @@ -170,7 +237,7 @@ def video_info(url, tweet="", desc="", thumb="", uploader=""): # Return a dict o return vnf def link_to_vnf_from_api(video_link): - print(" [ + ] Attempting to download tweet info from Twitter API") + print(" ➤ [ + ] Attempting to download tweet info from Twitter API") twid = int(re.sub(r'\?.*$','',video_link.rsplit("/", 1)[-1])) # gets the tweet ID as a int from the passed url tweet = twitter_api.statuses.show(_id=twid, tweet_mode="extended") @@ -186,11 +253,11 @@ def link_to_vnf_from_api(video_link): else: url = re.findall(r'(https?://[^\s]+)', tweet['full_text'])[0] thumb = "Non video link with url" - print(" [ NV ] Non video tweet, but has a link: " + url) + print(" ➤ [ NV ] Non video tweet, but has a link: " + url) else: url = re.findall(r'(https?://[^\s]+)', tweet['full_text'])[0] thumb = "Non video link with url" - print(" [ NV ] Non video tweet, but has a link: " + url) + print(" ➤ [ NV ] Non video tweet, but has a link: " + url) if len(tweet['full_text']) > 200: text = textwrap.shorten(tweet['full_text'], width=200, placeholder="...") @@ -201,7 +268,7 @@ def link_to_vnf_from_api(video_link): return vnf def link_to_vnf_from_youtubedl(video_link): - print("Attempting to download tweet info via YoutubeDL") + print(" ➤ [ X ] Attempting to download tweet info via YoutubeDL: " + video_link) with youtube_dl.YoutubeDL({'outtmpl': '%(id)s.%(ext)s'}) as ydl: result = ydl.extract_info(video_link, download=False) vnf = video_info(result['url'], video_link, result['description'].rsplit(' ',1)[0], result['thumbnail'], result['uploader']) @@ -212,21 +279,21 @@ def link_to_vnf(video_link): # Return a VideoInfo object or die trying try: return link_to_vnf_from_api(video_link) except Exception as e: - print("API Failed") + print(" ➤ [ !!! ] API Failed") print(e) return link_to_vnf_from_youtubedl(video_link) elif config['config']['method'] == 'api': try: return link_to_vnf_from_api(video_link) except Exception as e: - print(" [ X ] API Failed") + print(" ➤ [ X ] API Failed") print(e) return None elif config['config']['method'] == 'youtube-dl': try: return link_to_vnf_from_youtubedl(video_link) except Exception as e: - print(" [ X ] Youtube-DL Failed") + print(" ➤ [ X ] Youtube-DL Failed") print(e) return None else: @@ -237,11 +304,11 @@ def get_vnf_from_link_cache(video_link): if link_cache_system == "db": collection = db.linkCache vnf = collection.find_one({'tweet': video_link}) - if vnf != None: - print(" [ ✔ ] Link located in DB cache") + if vnf != None: + print(" ➤ [ ✔ ] Link located in DB cache") return vnf else: - print(" [ X ] Link not in DB cache") + print(" ➤ [ X ] Link not in DB cache") return None elif link_cache_system == "json": if video_link in link_cache: @@ -249,21 +316,21 @@ def get_vnf_from_link_cache(video_link): vnf = link_cache[video_link] return vnf else: - print(" [ X ] Link not in json cache") + print(" ➤ [ X ] Link not in json cache") return None def add_vnf_to_link_cache(video_link, vnf): if link_cache_system == "db": try: out = db.linkCache.insert_one(vnf) - print(" [ + ] Link added to DB cache ") + print(" ➤ [ + ] Link added to DB cache ") return True except Exception: - print(" [ X ] Failed to add link to DB cache") + print(" ➤ [ X ] Failed to add link to DB cache") return None elif link_cache_system == "json": link_cache[video_link] = vnf - with open("links.json", "w") as outfile: + with open("links.json", "w") as outfile: json.dump(link_cache, outfile, indent=4, sort_keys=True) return None @@ -271,7 +338,11 @@ def message(text): return render_template('default.html', message=text, color=config['config']['color'], appname=config['config']['appname'], repo=config['config']['repo'], url=config['config']['url']) def embed(video_link, vnf): - print(" [ E ] Embedding " + vnf['url']) + print(" ➤ [ E ] Embedding " + vnf['url']) + if vnf['thumbnail'] == "Non video link with url": + print(" ➤ [ NV ] Redirecting Non Video Tweet to Twitter") + return redirect(vnf['url'], 301) + if vnf['url'].startswith('https://t.co') is not True: desc = re.sub(r' http.*t\.co\S+', '', vnf['description']) urlUser = urllib.parse.quote(vnf['uploader']) @@ -295,4 +366,5 @@ def o_embed_gen(description, user, video_link): return out if __name__ == "__main__": + app.config['SERVER_NAME']='localhost:80' app.run(host='0.0.0.0')