from flask import Flask, render_template, request, redirect, Response, send_from_directory, url_for, send_file, make_response, jsonify from flask_cors import CORS import youtube_dl import textwrap import twitter import pymongo import requests import json import re import os import urllib.parse import urllib.request from datetime import date import boto3 app = Flask(__name__) CORS(app) pathregex = re.compile("\\w{1,15}\\/(status|statuses)\\/\\d{2,20}") generate_embed_user_agents = [ "facebookexternalhit/1.1", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.57 Safari/537.36", "Mozilla/5.0 (Windows; U; Windows NT 10.0; en-US; Valve Steam Client/default/1596241936; ) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.117 Safari/537.36", "Mozilla/5.0 (Windows; U; Windows NT 10.0; en-US; Valve Steam Client/default/0; ) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.117 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_1) AppleWebKit/601.2.4 (KHTML, like Gecko) Version/9.0.1 Safari/601.2.4 facebookexternalhit/1.1 Facebot Twitterbot/1.0", "facebookexternalhit/1.1", "Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; Valve Steam FriendsUI Tenfoot/0; ) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.105 Safari/537.36", "Slackbot-LinkExpanding 1.0 (+https://api.slack.com/robots)", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.10; rv:38.0) Gecko/20100101 Firefox/38.0", "Mozilla/5.0 (compatible; Discordbot/2.0; +https://discordapp.com)", "TelegramBot (like TwitterBot)", "Mozilla/5.0 (compatible; January/1.0; +https://gitlab.insrt.uk/revolt/january)", "test"] # Read config from config.json. If it does not exist, create new. if not os.path.exists("config.json"): serverless_check = os.environ.get('RUNNING_SERVERLESS') if serverless_check == None: # Running on local pc, therefore we can access the filesystem with open("config.json", "w") as outfile: default_config = { "config":{ "link_cache":"dynamodb", "database":"[url to mongo database goes here]", "table":"TwiFix", "method":"youtube-dl", "color":"#43B581", "appname": "vxTwitter", "repo": "https://github.com/dylanpdx/BetterTwitFix", "url": "https://vxtwitter.com" }, "api":{"api_key":"[api_key goes here]", "api_secret":"[api_secret goes here]", "access_token":"[access_token goes here]", "access_secret":"[access_secret goes here]" } } json.dump(default_config, outfile, indent=4, sort_keys=True) else: # Running on serverless, therefore we cannot access the filesystem and must use environment variables default_config = { "config":{ "link_cache":os.environ['VXTWITTER_LINK_CACHE'], "database":os.environ['VXTWITTER_DATABASE'], "table":os.environ['VXTWITTER_DATABASE_TABLE'], "method":os.environ['VXTWITTER_METHOD'], "color":os.environ['VXTWITTER_COLOR'], "appname": os.environ['VXTWITTER_APP_NAME'], "repo": os.environ['VXTWITTER_REPO'], "url": os.environ['VXTWITTER_URL'], }, "api":{ "api_key":os.environ['VXTWITTER_TWITTER_API_KEY'], "api_secret":os.environ['VXTWITTER_TWITTER_API_SECRET'], "access_token":os.environ['VXTWITTER_TWITTER_ACCESS_TOKEN'], "access_secret":os.environ['VXTWITTER_TWITTER_ACCESS_SECRET'] } } config = default_config else: f = open("config.json") config = json.load(f) f.close() # If method is set to API or Hybrid, attempt to auth with the Twitter API if config['config']['method'] in ('api', 'hybrid'): auth = twitter.oauth.OAuth(config['api']['access_token'], config['api']['access_secret'], config['api']['api_key'], config['api']['api_secret']) twitter_api = twitter.Twitter(auth=auth) link_cache_system = config['config']['link_cache'] DYNAMO_CACHE_TBL=None if link_cache_system=="dynamodb": DYNAMO_CACHE_TBL=os.environ['CACHE_TABLE'] if link_cache_system == "json": link_cache = {} if not os.path.exists("config.json"): with open("config.json", "w") as outfile: default_link_cache = {"test":"test"} json.dump(default_link_cache, outfile, indent=4, sort_keys=True) f = open('links.json',) link_cache = json.load(f) f.close() elif link_cache_system == "db": client = pymongo.MongoClient(config['config']['database'], connect=False) table = config['config']['table'] db = client[table] elif link_cache_system == "dynamodb": client = boto3.resource('dynamodb') @app.route('/') # If the useragent is discord, return the embed, if not, redirect to configured repo directly def default(): user_agent = request.headers.get('user-agent') if user_agent in generate_embed_user_agents: return message("TwitFix is an attempt to fix twitter video embeds in discord! created by Robin Universe :)\n\nšŸ’–\n\nClick me to be redirected to the repo!") else: return redirect(config['config']['repo'], 301) @app.route('/oembed.json') #oEmbed endpoint def oembedend(): desc = request.args.get("desc", None) user = request.args.get("user", None) link = request.args.get("link", None) ttype = request.args.get("ttype", None) return oEmbedGen(desc, user, link, ttype) @app.route('/') # Default endpoint used by everything def twitfix(sub_path): user_agent = request.headers.get('user-agent') match = pathregex.search(sub_path) print(request.url) if request.url.startswith("https://d.vx"): # Matches d.fx? Try to give the user a direct link if user_agent in generate_embed_user_agents: print( " āž¤ [ D ] d.vx link shown to discord user-agent!") if request.url.endswith(".mp4") and "?" not in request.url: return dl(sub_path) else: return message("To use a direct MP4 link in discord, remove anything past '?' and put '.mp4' at the end") else: print(" āž¤ [ R ] Redirect to MP4 using d.fxtwitter.com") return dir(sub_path) elif request.url.endswith(".mp4") or request.url.endswith("%2Emp4"): twitter_url = "https://twitter.com/" + sub_path if "?" not in request.url: clean = twitter_url[:-4] else: clean = twitter_url return dl(clean) # elif request.url.endswith(".json") or request.url.endswith("%2Ejson"): # twitter_url = "https://twitter.com/" + sub_path # if "?" not in request.url: # clean = twitter_url[:-5] # else: # clean = twitter_url # print( " āž¤ [ API ] VNF Json api hit!") # vnf = link_to_vnf_from_api(clean.replace(".json","")) # if user_agent in generate_embed_user_agents: # return message("VNF Data: ( discord useragent preview )\n\n"+ json.dumps(vnf, default=str)) # else: # return Response(response=json.dumps(vnf, default=str), status=200, mimetype="application/json") elif request.url.endswith("/1") or request.url.endswith("/2") or request.url.endswith("/3") or request.url.endswith("/4") or request.url.endswith("%2F1") or request.url.endswith("%2F2") or request.url.endswith("%2F3") or request.url.endswith("%2F4"): twitter_url = "https://twitter.com/" + sub_path if "?" not in request.url: clean = twitter_url[:-2] else: clean = twitter_url image = ( int(request.url[-1]) - 1 ) return embed_video(clean, image) if match is not None: twitter_url = sub_path if match.start() == 0: twitter_url = "https://twitter.com/" + sub_path if user_agent in generate_embed_user_agents: res = embed_video(twitter_url) return res else: print(" āž¤ [ R ] Redirect to " + twitter_url) return redirect(twitter_url, 301) else: return message("This doesn't appear to be a twitter URL") @app.route('/dir/') # Try to return a direct link to the MP4 on twitters servers def dir(sub_path): user_agent = request.headers.get('user-agent') url = sub_path match = pathregex.search(url) if match is not None: twitter_url = url if match.start() == 0: twitter_url = "https://twitter.com/" + url if user_agent in generate_embed_user_agents: res = embed_video(twitter_url) return res else: print(" āž¤ [ R ] Redirect to direct MP4 URL") return direct_video(twitter_url) else: return redirect(url, 301) @app.route('/favicon.ico') # This shit don't work def favicon(): return send_from_directory(os.path.join(app.root_path, 'static'), 'favicon.ico',mimetype='image/vnd.microsoft.icon') def direct_video(video_link): # Just get a redirect to a MP4 link from any tweet link cached_vnf = getVnfFromLinkCache(video_link) if cached_vnf == None: try: vnf = link_to_vnf(video_link) addVnfToLinkCache(video_link, vnf) return redirect(vnf['url'], 301) print(" āž¤ [ D ] Redirecting to direct URL: " + vnf['url']) except Exception as e: print(e) return message("Failed to scan your link!") else: return redirect(cached_vnf['url'], 301) print(" āž¤ [ D ] Redirecting to direct URL: " + vnf['url']) def direct_video_link(video_link): # Just get a redirect to a MP4 link from any tweet link cached_vnf = getVnfFromLinkCache(video_link) if cached_vnf == None: try: vnf = link_to_vnf(video_link) addVnfToLinkCache(video_link, vnf) return vnf['url'] print(" āž¤ [ D ] Redirecting to direct URL: " + vnf['url']) except Exception as e: print(e) return message("Failed to scan your link!") else: return cached_vnf['url'] print(" āž¤ [ D ] Redirecting to direct URL: " + vnf['url']) def embed_video(video_link, image=0): # Return Embed from any tweet link cached_vnf = getVnfFromLinkCache(video_link) if cached_vnf == None: try: vnf = link_to_vnf(video_link) addVnfToLinkCache(video_link, vnf) return embed(video_link, vnf, image) except Exception as e: print(e) return message("Failed to scan your link!") else: return embed(video_link, cached_vnf, image) def tweetInfo(url, tweet="", desc="", thumb="", uploader="", screen_name="", pfp="", tweetType="", images="", hits=0, likes=0, rts=0, time="", qrt={}, nsfw=False): # Return a dict of video info with default values vnf = { "tweet" : tweet, "url" : url, "description" : desc, "thumbnail" : thumb, "uploader" : uploader, "screen_name" : screen_name, "pfp" : pfp, "type" : tweetType, "images" : images, "hits" : hits, "likes" : likes, "rts" : rts, "time" : time, "qrt" : qrt, "nsfw" : nsfw } return vnf def link_to_vnf_from_api(video_link): print(" āž¤ [ + ] Attempting to download tweet info from Twitter API") twid = int(re.sub(r'\?.*$','',video_link.rsplit("/", 1)[-1])) # gets the tweet ID as a int from the passed url tweet = twitter_api.statuses.show(_id=twid, tweet_mode="extended") # For when I need to poke around and see what a tweet looks like #print(tweet) imgs = ["","","","", ""] print(" āž¤ [ + ] Tweet Type: " + tweetType(tweet)) # Check to see if tweet has a video, if not, make the url passed to the VNF the first t.co link in the tweet if tweetType(tweet) == "Video": if tweet['extended_entities']['media'][0]['video_info']['variants']: best_bitrate = 0 thumb = tweet['extended_entities']['media'][0]['media_url'] for video in tweet['extended_entities']['media'][0]['video_info']['variants']: if video['content_type'] == "video/mp4" and video['bitrate'] > best_bitrate: url = video['url'] elif tweetType(tweet) == "Text": url = "" thumb = "" else: imgs = ["","","","", ""] i = 0 for media in tweet['extended_entities']['media']: imgs[i] = media['media_url_https'] i = i + 1 #print(imgs) imgs[4] = str(i) url = "" images= imgs thumb = tweet['extended_entities']['media'][0]['media_url_https'] qrt = {} if 'quoted_status' in tweet: qrt['desc'] = tweet['quoted_status']['full_text'] qrt['handle'] = tweet['quoted_status']['user']['name'] qrt['screen_name'] = tweet['quoted_status']['user']['screen_name'] text = tweet['full_text'] if 'possibly_sensitive' in tweet: nsfw = tweet['possibly_sensitive'] else: nsfw = False vnf = tweetInfo( url, video_link, text, thumb, tweet['user']['name'], tweet['user']['screen_name'], tweet['user']['profile_image_url'], tweetType(tweet), likes=tweet['favorite_count'], rts=tweet['retweet_count'], time=tweet['created_at'], qrt=qrt, images=imgs, nsfw=nsfw ) return vnf def link_to_vnf_from_youtubedl(video_link): print(" āž¤ [ X ] Attempting to download tweet info via YoutubeDL: " + video_link) with youtube_dl.YoutubeDL({'outtmpl': '%(id)s.%(ext)s'}) as ydl: result = ydl.extract_info(video_link, download=False) vnf = tweetInfo(result['url'], video_link, result['description'].rsplit(' ',1)[0], result['thumbnail'], result['uploader']) return vnf def link_to_vnf(video_link): # Return a VideoInfo object or die trying if config['config']['method'] == 'hybrid': try: return link_to_vnf_from_api(video_link) except Exception as e: print(" āž¤ [ !!! ] API Failed") print(e) return link_to_vnf_from_youtubedl(video_link) elif config['config']['method'] == 'api': try: return link_to_vnf_from_api(video_link) except Exception as e: print(" āž¤ [ X ] API Failed") print(e) return None elif config['config']['method'] == 'youtube-dl': try: return link_to_vnf_from_youtubedl(video_link) except Exception as e: print(" āž¤ [ X ] Youtube-DL Failed") print(e) return None else: print("Please set the method key in your config file to 'api' 'youtube-dl' or 'hybrid'") return None def getVnfFromLinkCache(video_link): if link_cache_system == "db": collection = db.linkCache vnf = collection.find_one({'tweet': video_link}) # print(vnf) if vnf != None: hits = ( vnf['hits'] + 1 ) print(" āž¤ [ āœ” ] Link located in DB cache. " + "hits on this link so far: [" + str(hits) + "]") query = { 'tweet': video_link } change = { "$set" : { "hits" : hits } } out = db.linkCache.update_one(query, change) return vnf else: print(" āž¤ [ X ] Link not in DB cache") return None elif link_cache_system == "json": if video_link in link_cache: print("Link located in json cache") vnf = link_cache[video_link] return vnf else: print(" āž¤ [ X ] Link not in json cache") return None elif link_cache_system == "dynamodb": table = client.Table(DYNAMO_CACHE_TBL) response = table.get_item( Key={ 'tweet': video_link } ) if 'Item' in response: print("Link located in dynamodb cache") vnf = response['Item'] return vnf else: print(" āž¤ [ X ] Link not in dynamodb cache") return None def addVnfToLinkCache(video_link, vnf): if link_cache_system == "db": try: out = db.linkCache.insert_one(vnf) print(" āž¤ [ + ] Link added to DB cache ") return True except Exception: print(" āž¤ [ X ] Failed to add link to DB cache") return None elif link_cache_system == "json": link_cache[video_link] = vnf with open("links.json", "w") as outfile: json.dump(link_cache, outfile, indent=4, sort_keys=True) return None elif link_cache_system == "dynamodb": table = client.Table(DYNAMO_CACHE_TBL) table.put_item( Item={ 'tweet': video_link, 'vnf': vnf } ) print(" āž¤ [ + ] Link added to dynamodb cache ") return True def message(text): return render_template( 'default.html', message = text, color = config['config']['color'], appname = config['config']['appname'], repo = config['config']['repo'], url = config['config']['url'] ) def embed(video_link, vnf, image): print(vnf) print(" āž¤ [ E ] Embedding " + vnf['type'] + ": " + vnf['url']) desc = re.sub(r' http.*t\.co\S+', '', vnf['description']) urlUser = urllib.parse.quote(vnf['uploader']) urlDesc = urllib.parse.quote(desc) urlLink = urllib.parse.quote(video_link) likeDisplay = ("\n\nšŸ’– " + str(vnf['likes']) + " šŸ” " + str(vnf['rts']) + "\n") try: if vnf['type'] == "": desc = desc elif vnf['type'] == "Video": desc = desc elif vnf['qrt'] == {}: # Check if this is a QRT and modify the description desc = (desc + likeDisplay) else: qrtDisplay = ("\nā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€\n āž¤ QRT of " + vnf['qrt']['handle'] + " (@" + vnf['qrt']['screen_name'] + "):\nā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€\n'" + vnf['qrt']['desc'] + "'") desc = (desc + qrtDisplay + likeDisplay) except: vnf['likes'] = 0; vnf['rts'] = 0; vnf['time'] = 0 print(' āž¤ [ X ] Failed QRT check - old VNF object') if vnf['type'] == "Text": # Change the template based on tweet type template = 'text.html' if vnf['type'] == "Image": image = vnf['images'][image] template = 'image.html' if vnf['type'] == "Video": urlDesc = urllib.parse.quote(textwrap.shorten(desc, width=220, placeholder="...")) template = 'video.html' if vnf['type'] == "": urlDesc = urllib.parse.quote(textwrap.shorten(desc, width=220, placeholder="...")) template = 'video.html' color = "#7FFFD4" # Green if vnf['nsfw'] == True: color = "#800020" # Red return render_template( template, likes = vnf['likes'], rts = vnf['rts'], time = vnf['time'], screenName = vnf['screen_name'], vidlink = vnf['url'], pfp = vnf['pfp'], vidurl = vnf['url'], desc = desc, pic = image, user = vnf['uploader'], video_link = video_link, color = color, appname = config['config']['appname'], repo = config['config']['repo'], url = config['config']['url'], urlDesc = urlDesc, urlUser = urlUser, urlLink = urlLink, tweetLink = vnf['tweet'] ) def tweetType(tweet): # Are we dealing with a Video, Image, or Text tweet? if 'extended_entities' in tweet: if 'video_info' in tweet['extended_entities']['media'][0]: out = "Video" else: out = "Image" else: out = "Text" return out def oEmbedGen(description, user, video_link, ttype): out = { "type" : ttype, "version" : "1.0", "provider_name" : config['config']['appname'], "provider_url" : config['config']['repo'], "title" : description, "author_name" : user, "author_url" : video_link } return out if __name__ == "__main__": app.config['SERVER_NAME']='localhost:80' app.run(host='0.0.0.0')