Skip to content

Commit

Permalink
Update pixelateTG.py
Browse files Browse the repository at this point in the history
  • Loading branch information
arbadacarbaYK authored May 26, 2024
1 parent 4cd8ee6 commit a29a88a
Showing 1 changed file with 83 additions and 47 deletions.
130 changes: 83 additions & 47 deletions pixelateTG.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import cv2
import random
import imageio
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Updater, CallbackContext, CommandHandler, CallbackQueryHandler
from concurrent.futures import ThreadPoolExecutor, wait
Expand All @@ -16,7 +15,7 @@
executor = ThreadPoolExecutor(max_workers=MAX_THREADS)

def start(update: Update, context: CallbackContext) -> None:
update.message.reply_text('Send me a picture, GIF, or video, and I will pixelate faces in it!')
update.message.reply_text('Send me a picture, and I will pixelate faces in it!')

def detect_heads(image):
mtcnn = MTCNN()
Expand Down Expand Up @@ -82,6 +81,87 @@ def cats_overlay(photo_path, user_id, bot):
def clowns_overlay(photo_path, user_id, bot):
return overlay(photo_path, user_id, 'clown', RESIZE_FACTOR, bot)

def pixelate_faces(update: Update, context: CallbackContext) -> None:
chat_type = update.effective_chat.type

if chat_type == "private":
session_id = str(uuid4()) # Generate a unique session ID
context.user_data[session_id] = {'state': 'waiting_for_photo'}

file_id = update.message.photo[-1].file_id
file = context.bot.get_file(file_id)
file_name = file.file_path.split('/')[-1]
photo_path = f"downloads/{file_name}"
file.download(photo_path)

# Check if any faces are detected
image = cv2.imread(photo_path)
mtcnn = MTCNN()
faces = mtcnn.detect_faces(image)

if not faces:
# No faces detected, do nothing
update.message.reply_text('No faces detected in the image.')
return

keyboard = [
[InlineKeyboardButton("🤡 Clowns", callback_data=f'clowns_overlay_{session_id}'),
InlineKeyboardButton("😂 Liotta", callback_data=f'liotta_{session_id}'),
InlineKeyboardButton("☠️ Skull", callback_data=f'skull_of_satoshi_{session_id}')],
[InlineKeyboardButton("🐈‍⬛ Cats", callback_data=f'cats_overlay_{session_id}'),
InlineKeyboardButton("🐸 Pepe", callback_data=f'pepe_overlay_{session_id}'),
InlineKeyboardButton("🏆 Chad", callback_data=f'chad_overlay_{session_id}')],
[InlineKeyboardButton("⚔️ Pixel", callback_data=f'pixelate_{session_id}'),
InlineKeyboardButton("CANCEL", callback_data=f'cancel_{session_id}')], # Add Cancel button
]
reply_markup = InlineKeyboardMarkup(keyboard)
context.user_data[session_id]['photo_path'] = photo_path
context.user_data[session_id]['user_id'] = update.message.from_user.id

update.message.reply_text('Press until happy', reply_markup=reply_markup)

# Delete the original picture from the chat
update.message.delete()

def pixelate_command(update: Update, context: CallbackContext) -> None:
if update.message.reply_to_message and update.message.reply_to_message.photo:
session_id = str(uuid4()) # Generate a unique session ID
context.user_data[session_id] = {'state': 'waiting_for_photo'}

file_id = update.message.reply_to_message.photo[-1].file_id
file = context.bot.get_file(file_id)
file_name = file.file_path.split('/')[-1]
photo_path = f"downloads/{file_name}"
file.download(photo_path)

# Check if any faces are detected
image = cv2.imread(photo_path)
mtcnn = MTCNN()
faces = mtcnn.detect_faces(image)

if not faces:
# No faces detected, do nothing
update.message.reply_text('No faces detected in the image.')
return

keyboard = [
[InlineKeyboardButton("🤡 Clowns", callback_data=f'clowns_overlay_{session_id}'),
InlineKeyboardButton("😂 Liotta", callback_data=f'liotta_{session_id}'),
InlineKeyboardButton("☠️ Skull", callback_data=f'skull_of_satoshi_{session_id}')],
[InlineKeyboardButton("🐈‍⬛ Cats", callback_data=f'cats_overlay_{session_id}'),
InlineKeyboardButton("🐸 Pepe", callback_data=f'pepe_overlay_{session_id}'),
InlineKeyboardButton("🏆 Chad", callback_data=f'chad_overlay_{session_id}')],
[InlineKeyboardButton("⚔️ Pixel", callback_data=f'pixelate_{session_id}'),
InlineKeyboardButton("CANCEL", callback_data=f'cancel_{session_id}')], # Add Cancel button
]
reply_markup = InlineKeyboardMarkup(keyboard)
context.user_data[session_id]['photo_path'] = photo_path
context.user_data[session_id]['user_id'] = update.message.from_user.id

update.message.reply_text('Press until happy', reply_markup=reply_markup)
else:
update.message.reply_text('This only works as a reply to a picture.')

def process_image(photo_path, user_id, file_id, bot):
image = cv2.imread(photo_path)
faces = detect_heads(image)
Expand All @@ -99,50 +179,6 @@ def process_face(x, y, w, h):

return processed_path

def process_media(file_path, user_id, file_id, bot):
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension == '.jpg' or file_extension == '.jpeg' or file_extension == '.png':
return process_image(file_path, user_id, file_id, bot)
elif file_extension == '.gif':
return process_gif(file_path, user_id, file_id, bot)
elif file_extension in ['.mp4', '.avi', '.mkv']:
return process_video(file_path, user_id, file_id, bot)
else:
return None # Unsupported file type

def process_gif(gif_path, user_id, file_id, bot):
# Extract frames from GIF and process each frame
frames = imageio.mimread(gif_path)
processed_frames = [process_image(frame, user_id, file_id, bot) for frame in frames]
# Reconstruct GIF
processed_gif_path = f"processed/{user_id}_{file_id}.gif"
imageio.mimsave(processed_gif_path, processed_frames)
return processed_gif_path

def process_video(video_path, user_id, file_id, bot):
# Extract frames from video and process each frame
cap = cv2.VideoCapture(video_path)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
processed_frames = []
for _ in range(frame_count):
ret, frame = cap.read()
if not ret:
break
processed_frame = process_image(frame, user_id, file_id, bot)
processed_frames.append(processed_frame)
cap.release()

# Reconstruct video
processed_video_path = f"processed/{user_id}_{file_id}.mp4"
out = cv2.VideoWriter(processed_video_path, cv2.VideoWriter_fourcc(*'mp4v'), 30, (processed_frames[0].shape[1], processed_frames[0].shape[0]))
for frame in processed_frames:
out.write(frame)
out.release()

return processed_video_path



def button_callback(update: Update, context: CallbackContext) -> None:
query = update.callback_query
query.answer()
Expand Down Expand Up @@ -185,8 +221,8 @@ def main() -> None:
dispatcher = updater.dispatcher

dispatcher.add_handler(CommandHandler("start", start))
dispatcher.add_handler(MessageHandler(Filters.all & Filters.private, process_media))
dispatcher.add_handler(CommandHandler("pixel", pixelate_command))
dispatcher.add_handler(MessageHandler(Filters.photo & Filters.private, pixelate_faces))
dispatcher.add_handler(CallbackQueryHandler(button_callback))

updater.start_polling()
Expand Down

0 comments on commit a29a88a

Please sign in to comment.