This repository has been archived on 2022-08-02. You can view files and clone it, but cannot push or open issues or pull requests.
Twxtter-main/twitfix.py

851 lines
25 KiB
Python

from flask import (
Flask,
render_template,
request,
redirect,
Response,
send_from_directory,
url_for,
send_file,
make_response,
jsonify,
)
from flask_cors import CORS
import youtube_dl
import textwrap
import twitter
import pymongo
import requests
import json
import re
import os
import urllib.parse
import urllib.request
from datetime import date
app = Flask(__name__)
CORS(app)
pathregex = re.compile("\\w{1,15}\\/(status|statuses)\\/\\d{2,20}")
generate_embed_user_agents = [
"facebookexternalhit/1.1",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.57 Safari/537.36",
"Mozilla/5.0 (Windows; U; Windows NT 10.0; en-US; Valve Steam Client/default/1596241936; ) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.117 Safari/537.36",
"Mozilla/5.0 (Windows; U; Windows NT 10.0; en-US; Valve Steam Client/default/0; ) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.117 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_1) AppleWebKit/601.2.4 (KHTML, like Gecko) Version/9.0.1 Safari/601.2.4 facebookexternalhit/1.1 Facebot Twitterbot/1.0",
"facebookexternalhit/1.1",
"Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; Valve Steam FriendsUI Tenfoot/0; ) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.105 Safari/537.36",
"Slackbot-LinkExpanding 1.0 (+https://api.slack.com/robots)",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.10; rv:38.0) Gecko/20100101 Firefox/38.0",
"Mozilla/5.0 (compatible; Discordbot/2.0; +https://discordapp.com)",
"TelegramBot (like TwitterBot)",
"Mozilla/5.0 (compatible; January/1.0; +https://gitlab.insrt.uk/revolt/january)",
"test",
]
# Read config from config.json. If it does not exist, create new.
if not os.path.exists("config.json"):
with open("config.json", "w") as outfile:
default_config = {
"config": {
"link_cache": "json",
"database": "[url to mongo database goes here]",
"table": "TwiFix",
"method": "youtube-dl",
"color": "#43B581",
"appname": "TwitFix",
"repo": "https://github.com/robinuniverse/twitfix",
"url": "https://fxtwitter.com",
},
"api": {
"api_key": "[api_key goes here]",
"api_secret": "[api_secret goes here]",
"access_token": "[access_token goes here]",
"access_secret": "[access_secret goes here]",
},
}
json.dump(default_config, outfile, indent=4, sort_keys=True)
config = default_config
else:
f = open("config.json")
config = json.load(f)
f.close()
# If method is set to API or Hybrid, attempt to auth with the Twitter API
if config["config"]["method"] in ("api", "hybrid"):
auth = twitter.oauth.OAuth(
config["api"]["access_token"],
config["api"]["access_secret"],
config["api"]["api_key"],
config["api"]["api_secret"],
)
twitter_api = twitter.Twitter(auth=auth)
link_cache_system = config["config"]["link_cache"]
if link_cache_system == "json":
link_cache = {}
if not os.path.exists("config.json"):
with open("config.json", "w") as outfile:
default_link_cache = {"test": "test"}
json.dump(default_link_cache, outfile, indent=4, sort_keys=True)
f = open(
"links.json",
)
link_cache = json.load(f)
f.close()
elif link_cache_system == "db":
client = pymongo.MongoClient(config["config"]["database"], connect=False)
table = config["config"]["table"]
db = client[table]
@app.route("/bidoof/")
def bidoof():
return redirect(
"https://cdn.discordapp.com/attachments/291764448757284885/937343686927319111/IMG_20211226_202956_163.webp",
301,
)
@app.route("/thisiswhatrunstwxtter/")
def discord():
return redirect(
"https://cdn.discordapp.com/attachments/932899721767448607/976472084974829568/unknown.png",
301,
)
@app.route("/stats/")
def statsPage():
today = str(date.today())
stats = getStats(today)
return render_template(
"stats.html",
embeds=stats["embeds"],
downloadss=stats["downloads"],
api=stats["api"],
linksCached=stats["linksCached"],
date=today,
)
@app.route("/latest/")
def latest():
return render_template("latest.html")
@app.route("/copy.svg") # Return a SVG needed for Latest
def icon():
return send_from_directory(
os.path.join(app.root_path, "static"), "copy.svg", mimetype="image/svg+xml"
)
@app.route("/font.ttf") # Return a font needed for Latest
def font():
return send_from_directory(
os.path.join(app.root_path, "static"),
"NotoColorEmoji.ttf",
mimetype="application/octet-stream",
)
@app.route("/top/") # Try to return the most hit video
def top():
vnf = db.linkCache.find_one(sort=[("hits", pymongo.DESCENDING)])
desc = re.sub(r" http.*t\.co\S+", "", vnf["description"])
urlUser = urllib.parse.quote(vnf["uploader"])
urlDesc = urllib.parse.quote(desc)
urlLink = urllib.parse.quote(vnf["url"])
print(" ➤ [ ✔ ] Top video page loaded: " + vnf["tweet"])
return render_template(
"inline.html",
page="Top",
vidlink=vnf["url"],
vidurl=vnf["url"],
desc=desc,
pic=vnf["thumbnail"],
user=vnf["uploader"],
video_link=vnf["url"],
color=config["config"]["color"],
appname=config["config"]["appname"],
repo=config["config"]["repo"],
url=config["config"]["url"],
urlDesc=urlDesc,
urlUser=urlUser,
urlLink=urlLink,
tweet=vnf["tweet"],
)
@app.route("/api/latest/") # Return some raw VNF data sorted by top tweets
def apiLatest():
bigvnf = []
tweets = request.args.get("tweets", default=10, type=int)
page = request.args.get("page", default=0, type=int)
if tweets > 15:
tweets = 1
vnf = (
db.linkCache.find(sort=[("_id", pymongo.DESCENDING)])
.skip(tweets * page)
.limit(tweets)
)
for r in vnf:
bigvnf.append(r)
print(" ➤ [ ✔ ] Latest video API called")
addToStat("api")
return Response(
response=json.dumps(bigvnf, default=str),
status=200,
mimetype="application/json",
)
@app.route("/api/top/") # Return some raw VNF data sorted by top tweets
def apiTop():
bigvnf = []
tweets = request.args.get("tweets", default=10, type=int)
page = request.args.get("page", default=0, type=int)
if tweets > 15:
tweets = 1
vnf = (
db.linkCache.find(sort=[("hits", pymongo.DESCENDING)])
.skip(tweets * page)
.limit(tweets)
)
for r in vnf:
bigvnf.append(r)
print(" ➤ [ ✔ ] Top video API called")
addToStat("api")
return Response(
response=json.dumps(bigvnf, default=str),
status=200,
mimetype="application/json",
)
@app.route(
"/api/stats/"
) # Return a json of a usage stats for a given date (defaults to today)
def apiStats():
try:
addToStat("api")
today = str(date.today())
desiredDate = request.args.get("date", default=today, type=str)
stat = getStats(desiredDate)
print(" ➤ [ ✔ ] Stats API called")
return Response(
response=json.dumps(stat, default=str),
status=200,
mimetype="application/json",
)
except:
print(" ➤ [ ✔ ] Stats API failed")
@app.route(
"/"
) # If the useragent is discord, return the embed, if not, redirect to configured repo directly
def default():
user_agent = request.headers.get("user-agent")
if user_agent in generate_embed_user_agents:
return message(
"Twxtter is an attempt to fix twitter video embeds in discord! :)\n\n💖\n\nClick me to be redirected to the repo!"
)
else:
return redirect(config["config"]["repo"], 301)
@app.route("/oembed.json") # oEmbed endpoint
def oembedend():
desc = request.args.get("desc", None)
user = request.args.get("user", None)
link = request.args.get("link", None)
ttype = request.args.get("ttype", None)
return oEmbedGen(desc, user, link, ttype)
@app.route("/<path:sub_path>") # Default endpoint used by everything
def twitfix(sub_path):
user_agent = request.headers.get("user-agent")
match = pathregex.search(sub_path)
print(request.url)
if request.url.endswith(".mp4") or request.url.endswith("%2Emp4"):
twitter_url = "https://twitter.com/" + sub_path
if "?" not in request.url:
clean = twitter_url[:-4]
else:
clean = twitter_url
return dl(clean)
elif request.url.endswith(".json") or request.url.endswith("%2Ejson"):
twitter_url = "https://twitter.com/" + sub_path
if "?" not in request.url:
clean = twitter_url[:-5]
else:
clean = twitter_url
print(" ➤ [ API ] VNF Json api hit!")
vnf = link_to_vnf_from_api(clean.replace(".json", ""))
if user_agent in generate_embed_user_agents:
return message(
"VNF Data: ( discord useragent preview )\n\n"
+ json.dumps(vnf, default=str)
)
else:
return Response(
response=json.dumps(vnf, default=str),
status=200,
mimetype="application/json",
)
elif (
request.url.endswith(("/1", "/2", "/3", "/4", "%2F1", "%2F2", "%2F3", "%2F4"))
):
twitter_url = "https://twitter.com/" + sub_path
if "?" not in request.url:
clean = twitter_url[:-2]
else:
clean = twitter_url
image = int(request.url[-1]) - 1
return embed_video(clean, image)
elif (
request.url.endswith(("/1p", "/2p", "/3p", "/4p", "%2F1p", "%2F2p", "%2F3p", "%2F4p"))
):
twitter_url = "https://twitter.com/" + sub_path
if "?" not in request.url:
clean = twitter_url[:-3]
else:
clean = twitter_url
image = int(request.url[-2]) - 1
return embed_video(clean, image, raw=True)
if match is not None:
twitter_url = sub_path
if match.start() == 0:
twitter_url = "https://twitter.com/" + sub_path
if user_agent in generate_embed_user_agents:
res = embed_video(twitter_url)
return res
else:
print(" ➤ [ R ] Redirect to " + twitter_url)
return redirect(twitter_url, 301)
else:
return message("This doesn't appear to be a twitter URL")
@app.route(
"/other/<path:sub_path>"
) # Show all info that Youtube-DL can get about a video as a json
def other(sub_path):
otherurl = request.url.split("/other/", 1)[1].replace(":/", "://")
print(" ➤ [ OTHER ] Other URL embed attempted: " + otherurl)
res = embed_video(otherurl)
return res
@app.route(
"/info/<path:sub_path>"
) # Show all info that Youtube-DL can get about a video as a json
def info(sub_path):
infourl = request.url.split("/info/", 1)[1].replace(":/", "://")
print(" ➤ [ INFO ] Info data requested: " + infourl)
with youtube_dl.YoutubeDL({"outtmpl": "%(id)s.%(ext)s"}) as ydl:
result = ydl.extract_info(infourl, download=False)
return result
@app.route("/dl/<path:sub_path>") # Download the tweets video, and rehost it
def dl(sub_path):
print(" ➤ [[ !!! TRYING TO DOWNLOAD FILE !!! ]] Downloading file from " + sub_path)
url = sub_path
match = pathregex.search(url)
if match is not None:
twitter_url = url
if match.start() == 0:
twitter_url = "https://twitter.com/" + url
mp4link = direct_video_link(twitter_url)
filename = sub_path.split("/")[-1].split(".mp4")[0] + ".mp4"
PATH = "./static/" + filename
if os.path.isfile(PATH) and os.access(PATH, os.R_OK):
print(" ➤ [[ FILE EXISTS ]]")
else:
print(" ➤ [[ FILE DOES NOT EXIST, DOWNLOADING... ]]")
addToStat("downloads")
mp4file = urllib.request.urlopen(mp4link)
with open(("/home/twitfix/Twxtter-main/static/" + filename), "wb") as output:
output.write(mp4file.read())
print(
" ➤ [[ PRESENTING FILE: "
+ filename
+ ", URL: https://twxtter.com/static/"
+ filename
+ " ]]"
)
r = make_response(
send_file(("static/" + filename), mimetype="video/mp4", max_age=100)
)
r.headers["Content-Type"] = "video/mp4"
r.headers["Sec-Fetch-Site"] = "none"
r.headers["Sec-Fetch-User"] = "?1"
return r
@app.route(
"/dir/<path:sub_path>"
) # Try to return a direct link to the MP4 on twitters servers
def dir(sub_path):
user_agent = request.headers.get("user-agent")
url = sub_path
match = pathregex.search(url)
if match is not None:
twitter_url = url
if match.start() == 0:
twitter_url = "https://twitter.com/" + url
if user_agent in generate_embed_user_agents:
res = embed_video(twitter_url)
return res
else:
print(" ➤ [ R ] Redirect to direct MP4 URL")
return direct_video(twitter_url)
else:
return redirect(url, 301)
def direct_video(video_link): # Just get a redirect to a MP4 link from any tweet link
cached_vnf = getVnfFromLinkCache(video_link)
if cached_vnf == None:
try:
vnf = link_to_vnf(video_link)
addVnfToLinkCache(video_link, vnf)
return redirect(vnf["url"], 301)
print(" ➤ [ D ] Redirecting to direct URL: " + vnf["url"])
except Exception as e:
print(e)
return message("Failed to scan your link!")
else:
return redirect(cached_vnf["url"], 301)
print(" ➤ [ D ] Redirecting to direct URL: " + vnf["url"])
def direct_video_link(
video_link,
): # Just get a redirect to a MP4 link from any tweet link
cached_vnf = getVnfFromLinkCache(video_link)
if cached_vnf == None:
try:
vnf = link_to_vnf(video_link)
addVnfToLinkCache(video_link, vnf)
return vnf["url"]
print(" ➤ [ D ] Redirecting to direct URL: " + vnf["url"])
except Exception as e:
print(e)
return message("Failed to scan your link!")
else:
return cached_vnf["url"]
print(" ➤ [ D ] Redirecting to direct URL: " + vnf["url"])
def addToStat(stat):
# print(stat)
today = str(date.today())
try:
collection = db.stats.find_one({"date": today})
delta = collection[stat] + 1
query = {"date": today}
change = {"$set": {stat: delta}}
out = db.stats.update_one(query, change)
except:
collection = db.stats.insert_one(
{"date": today, "embeds": 1, "linksCached": 1, "api": 1, "downloads": 1}
)
def getStats(day):
collection = db.stats.find_one({"date": day})
return collection
def embed_video(video_link, image=0, raw=False): # Return Embed from any tweet link
cached_vnf = getVnfFromLinkCache(video_link)
if cached_vnf == None:
try:
vnf = link_to_vnf(video_link)
addVnfToLinkCache(video_link, vnf)
return embed(video_link, vnf, image, raw)
except Exception as e:
print(e)
return message("Failed to scan your link!")
else:
return embed(video_link, cached_vnf, image, raw)
def tweetInfo(
url="",
tweet="",
desc="",
thumb="",
uploader="",
screen_name="",
pfp="",
tweetType="",
images="",
hits=0,
likes=0,
rts=0,
time="",
qrt={},
nsfw=False,
): # Return a dict of video info with default values
vnf = {
"tweet": tweet,
"url": url,
"description": desc,
"thumbnail": thumb,
"uploader": uploader,
"screen_name": screen_name,
"pfp": pfp,
"type": tweetType,
"images": images,
"hits": hits,
"likes": likes,
"rts": rts,
"time": time,
"qrt": qrt,
"nsfw": nsfw,
}
return vnf
def link_to_vnf_from_api(video_link):
print(" ➤ [ + ] Attempting to download tweet info from Twitter API")
twid = int(
re.sub(r"\?.*$", "", video_link.rsplit("/", 1)[-1])
) # gets the tweet ID as a int from the passed url
tweet = twitter_api.statuses.show(_id=twid, tweet_mode="extended")
# For when I need to poke around and see what a tweet looks like
# print(tweet)
text = tweet["full_text"]
nsfw = tweet.get("possibly_sensitive", False)
qrt = {}
imgs = ["", "", "", "", ""]
print(" ➤ [ + ] Tweet Type: " + tweetType(tweet))
# Check to see if tweet has a video, if not, make the url passed to the VNF the first t.co link in the tweet
if tweetType(tweet) == "Video":
if tweet["extended_entities"]["media"][0]["video_info"]["variants"]:
best_bitrate = 0
thumb = tweet["extended_entities"]["media"][0]["media_url"]
for video in tweet["extended_entities"]["media"][0]["video_info"][
"variants"
]:
if (
video["content_type"] == "video/mp4"
and video["bitrate"] > best_bitrate
):
url = video["url"]
best_bitrate = video['bitrate']
elif tweetType(tweet) == "Text":
url = ""
thumb = ""
else:
imgs = ["", "", "", "", ""]
i = 0
for media in tweet["extended_entities"]["media"]:
imgs[i] = media["media_url_https"]
i = i + 1
# print(imgs)
imgs[4] = str(i)
url = ""
images = imgs
thumb = tweet["extended_entities"]["media"][0]["media_url_https"]
if "quoted_status" in tweet:
qrt["desc"] = tweet["quoted_status"]["full_text"]
qrt["handle"] = tweet["quoted_status"]["user"]["name"]
qrt["screen_name"] = tweet["quoted_status"]["user"]["screen_name"]
text = tweet["full_text"]
if "possibly_sensitive" in tweet:
nsfw = tweet["possibly_sensitive"]
else:
nsfw = False
vnf = tweetInfo(
url,
video_link,
text,
thumb,
tweet["user"]["name"],
tweet["user"]["screen_name"],
tweet["user"]["profile_image_url"],
tweetType(tweet),
likes=tweet["favorite_count"],
rts=tweet["retweet_count"],
time=tweet["created_at"],
qrt=qrt,
images=imgs,
nsfw=nsfw,
)
return vnf
def link_to_vnf_from_youtubedl(video_link):
print(" ➤ [ X ] Attempting to download tweet info via YoutubeDL: " + video_link)
with youtube_dl.YoutubeDL({"outtmpl": "%(id)s.%(ext)s"}) as ydl:
result = ydl.extract_info(video_link, download=False)
vnf = tweetInfo(
result["url"],
video_link,
result["description"].rsplit(" ", 1)[0],
result["thumbnail"],
result["uploader"],
)
return vnf
def link_to_vnf(video_link): # Return a VideoInfo object or die trying
if config["config"]["method"] == "hybrid":
try:
return link_to_vnf_from_api(video_link)
except Exception as e:
print(" ➤ [ !!! ] API Failed")
print(e)
return link_to_vnf_from_youtubedl(video_link)
elif config["config"]["method"] == "api":
try:
return link_to_vnf_from_api(video_link)
except Exception as e:
print(" ➤ [ X ] API Failed")
print(e)
return None
elif config["config"]["method"] == "youtube-dl":
try:
return link_to_vnf_from_youtubedl(video_link)
except Exception as e:
print(" ➤ [ X ] Youtube-DL Failed")
print(e)
return None
else:
print(
"Please set the method key in your config file to 'api' 'youtube-dl' or 'hybrid'"
)
return None
def getVnfFromLinkCache(video_link):
if link_cache_system == "db":
collection = db.linkCache
vnf = collection.find_one({"tweet": video_link})
# print(vnf)
if vnf != None:
hits = vnf["hits"] + 1
print(
" ➤ [ ✔ ] Link located in DB cache. "
+ "hits on this link so far: ["
+ str(hits)
+ "]"
)
query = {"tweet": video_link}
change = {"$set": {"hits": hits}}
out = db.linkCache.update_one(query, change)
addToStat("embeds")
return vnf
else:
print(" ➤ [ X ] Link not in DB cache")
return None
elif link_cache_system == "json":
if video_link in link_cache:
print("Link located in json cache")
vnf = link_cache[video_link]
return vnf
else:
print(" ➤ [ X ] Link not in json cache")
return None
def addVnfToLinkCache(video_link, vnf):
if link_cache_system == "db":
try:
out = db.linkCache.insert_one(vnf)
print(" ➤ [ + ] Link added to DB cache ")
addToStat("linksCached")
return True
except Exception:
print(" ➤ [ X ] Failed to add link to DB cache")
return None
elif link_cache_system == "json":
link_cache[video_link] = vnf
with open("links.json", "w") as outfile:
json.dump(link_cache, outfile, indent=4, sort_keys=True)
return None
def message(text):
return render_template(
"default.html",
message=text,
color=config["config"]["color"],
appname=config["config"]["appname"],
repo=config["config"]["repo"],
url=config["config"]["url"],
)
def embed(video_link, vnf, image, raw=False):
print(" ➤ [ E ] Embedding " + vnf["type"] + ": " + vnf["url"])
desc = re.sub(r" http.*t\.co\S+", "", vnf["description"])
urlUser = urllib.parse.quote(vnf["uploader"])
urlDesc = urllib.parse.quote(desc)
urlLink = urllib.parse.quote(video_link)
likeDisplay = "\n\n💖 " + str(vnf["likes"]) + " 🔁 " + str(vnf["rts"]) + "\n"
imagecount = "Twitter"
try:
if vnf["type"] == "":
desc = desc
elif vnf["type"] == "Video":
desc = desc
elif vnf["qrt"] == {}: # Check if this is a QRT and modify the description
desc = desc + likeDisplay
else:
qrtDisplay = (
"\n─────────────\n ➤ QRT of "
+ vnf["qrt"]["handle"]
+ " (@"
+ vnf["qrt"]["screen_name"]
+ "):\n─────────────\n'"
+ vnf["qrt"]["desc"]
+ "'"
)
desc = desc + qrtDisplay + likeDisplay
except:
vnf["likes"] = 0
vnf["rts"] = 0
vnf["time"] = 0
print(" ➤ [ X ] Failed QRT check - old VNF object")
if vnf["type"] == "Text": # Change the template based on tweet type
template = "text.html"
if vnf["type"] == "Image":
image = vnf["images"][image]
if vnf["images"][4] != "1":
imagecount = "Twitter (" + vnf["images"][4] + " images in post)"
if raw == True:
template = "img.html"
else:
template = "image.html"
if vnf["type"] == "Video":
urlDesc = urllib.parse.quote(
textwrap.shorten(desc, width=220, placeholder="...")
)
template = "video.html"
if vnf["type"] == "":
urlDesc = urllib.parse.quote(
textwrap.shorten(desc, width=220, placeholder="...")
)
template = "video.html"
color = config['config']['color']
if vnf["nsfw"] == True:
color = "#800020" # Red
return render_template(
template,
likes=vnf["likes"],
rts=vnf["rts"],
time=vnf["time"],
screenName=vnf["screen_name"],
vidlink=vnf["url"],
pfp=vnf["pfp"],
vidurl=vnf["url"],
desc=desc,
pic=image,
imagecount=imagecount,
user=vnf["uploader"],
video_link=video_link,
color=color,
appname=config['config']['appname'],
repo=config["config"]["repo"],
url=config["config"]["url"],
urlDesc=urlDesc,
urlUser=urlUser,
urlLink=urlLink,
tweetLink=vnf['tweet']
)
def tweetType(tweet): # Are we dealing with a Video, Image, or Text tweet?
if "extended_entities" in tweet:
if "video_info" in tweet["extended_entities"]["media"][0]:
out = "Video"
else:
out = "Image"
else:
out = "Text"
return out
def oEmbedGen(description, user, video_link, ttype):
out = {
"type": ttype,
"version": "1.0",
"provider_name": config["config"]["appname"],
"provider_url": config["config"]["repo"],
"title": description,
"author_name": user,
"author_url": video_link,
}
return out
if __name__ == "__main__":
port = "80"
if "PORT" in os.environ:
port = os.environ["PORT"]
app.config["SERVER_NAME"] = "localhost:" + port
app.run(host="0.0.0.0", port=port)