From a29a88a8c6b0cafb55c58555371f4b2983b11579 Mon Sep 17 00:00:00 2001 From: arbadacarba <63317640+arbadacarbaYK@users.noreply.github.com> Date: Sun, 26 May 2024 05:37:18 +0200 Subject: [PATCH] Update pixelateTG.py --- pixelateTG.py | 130 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 83 insertions(+), 47 deletions(-) diff --git a/pixelateTG.py b/pixelateTG.py index 4ff4c7c..a533568 100644 --- a/pixelateTG.py +++ b/pixelateTG.py @@ -1,7 +1,6 @@ import os import cv2 import random -import imageio from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Updater, CallbackContext, CommandHandler, CallbackQueryHandler from concurrent.futures import ThreadPoolExecutor, wait @@ -16,7 +15,7 @@ executor = ThreadPoolExecutor(max_workers=MAX_THREADS) def start(update: Update, context: CallbackContext) -> None: - update.message.reply_text('Send me a picture, GIF, or video, and I will pixelate faces in it!') + update.message.reply_text('Send me a picture, and I will pixelate faces in it!') def detect_heads(image): mtcnn = MTCNN() @@ -82,6 +81,87 @@ def cats_overlay(photo_path, user_id, bot): def clowns_overlay(photo_path, user_id, bot): return overlay(photo_path, user_id, 'clown', RESIZE_FACTOR, bot) +def pixelate_faces(update: Update, context: CallbackContext) -> None: + chat_type = update.effective_chat.type + + if chat_type == "private": + session_id = str(uuid4()) # Generate a unique session ID + context.user_data[session_id] = {'state': 'waiting_for_photo'} + + file_id = update.message.photo[-1].file_id + file = context.bot.get_file(file_id) + file_name = file.file_path.split('/')[-1] + photo_path = f"downloads/{file_name}" + file.download(photo_path) + + # Check if any faces are detected + image = cv2.imread(photo_path) + mtcnn = MTCNN() + faces = mtcnn.detect_faces(image) + + if not faces: + # No faces detected, do nothing + update.message.reply_text('No faces detected in the image.') + return + + keyboard = [ + [InlineKeyboardButton("🤡 Clowns", callback_data=f'clowns_overlay_{session_id}'), + InlineKeyboardButton("😂 Liotta", callback_data=f'liotta_{session_id}'), + InlineKeyboardButton("☠️ Skull", callback_data=f'skull_of_satoshi_{session_id}')], + [InlineKeyboardButton("🐈‍⬛ Cats", callback_data=f'cats_overlay_{session_id}'), + InlineKeyboardButton("🐸 Pepe", callback_data=f'pepe_overlay_{session_id}'), + InlineKeyboardButton("🏆 Chad", callback_data=f'chad_overlay_{session_id}')], + [InlineKeyboardButton("⚔️ Pixel", callback_data=f'pixelate_{session_id}'), + InlineKeyboardButton("CANCEL", callback_data=f'cancel_{session_id}')], # Add Cancel button + ] + reply_markup = InlineKeyboardMarkup(keyboard) + context.user_data[session_id]['photo_path'] = photo_path + context.user_data[session_id]['user_id'] = update.message.from_user.id + + update.message.reply_text('Press until happy', reply_markup=reply_markup) + + # Delete the original picture from the chat + update.message.delete() + +def pixelate_command(update: Update, context: CallbackContext) -> None: + if update.message.reply_to_message and update.message.reply_to_message.photo: + session_id = str(uuid4()) # Generate a unique session ID + context.user_data[session_id] = {'state': 'waiting_for_photo'} + + file_id = update.message.reply_to_message.photo[-1].file_id + file = context.bot.get_file(file_id) + file_name = file.file_path.split('/')[-1] + photo_path = f"downloads/{file_name}" + file.download(photo_path) + + # Check if any faces are detected + image = cv2.imread(photo_path) + mtcnn = MTCNN() + faces = mtcnn.detect_faces(image) + + if not faces: + # No faces detected, do nothing + update.message.reply_text('No faces detected in the image.') + return + + keyboard = [ + [InlineKeyboardButton("🤡 Clowns", callback_data=f'clowns_overlay_{session_id}'), + InlineKeyboardButton("😂 Liotta", callback_data=f'liotta_{session_id}'), + InlineKeyboardButton("☠️ Skull", callback_data=f'skull_of_satoshi_{session_id}')], + [InlineKeyboardButton("🐈‍⬛ Cats", callback_data=f'cats_overlay_{session_id}'), + InlineKeyboardButton("🐸 Pepe", callback_data=f'pepe_overlay_{session_id}'), + InlineKeyboardButton("🏆 Chad", callback_data=f'chad_overlay_{session_id}')], + [InlineKeyboardButton("⚔️ Pixel", callback_data=f'pixelate_{session_id}'), + InlineKeyboardButton("CANCEL", callback_data=f'cancel_{session_id}')], # Add Cancel button + ] + reply_markup = InlineKeyboardMarkup(keyboard) + context.user_data[session_id]['photo_path'] = photo_path + context.user_data[session_id]['user_id'] = update.message.from_user.id + + update.message.reply_text('Press until happy', reply_markup=reply_markup) + else: + update.message.reply_text('This only works as a reply to a picture.') + def process_image(photo_path, user_id, file_id, bot): image = cv2.imread(photo_path) faces = detect_heads(image) @@ -99,50 +179,6 @@ def process_face(x, y, w, h): return processed_path -def process_media(file_path, user_id, file_id, bot): - file_extension = os.path.splitext(file_path)[1].lower() - if file_extension == '.jpg' or file_extension == '.jpeg' or file_extension == '.png': - return process_image(file_path, user_id, file_id, bot) - elif file_extension == '.gif': - return process_gif(file_path, user_id, file_id, bot) - elif file_extension in ['.mp4', '.avi', '.mkv']: - return process_video(file_path, user_id, file_id, bot) - else: - return None # Unsupported file type - -def process_gif(gif_path, user_id, file_id, bot): - # Extract frames from GIF and process each frame - frames = imageio.mimread(gif_path) - processed_frames = [process_image(frame, user_id, file_id, bot) for frame in frames] - # Reconstruct GIF - processed_gif_path = f"processed/{user_id}_{file_id}.gif" - imageio.mimsave(processed_gif_path, processed_frames) - return processed_gif_path - -def process_video(video_path, user_id, file_id, bot): - # Extract frames from video and process each frame - cap = cv2.VideoCapture(video_path) - frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - processed_frames = [] - for _ in range(frame_count): - ret, frame = cap.read() - if not ret: - break - processed_frame = process_image(frame, user_id, file_id, bot) - processed_frames.append(processed_frame) - cap.release() - - # Reconstruct video - processed_video_path = f"processed/{user_id}_{file_id}.mp4" - out = cv2.VideoWriter(processed_video_path, cv2.VideoWriter_fourcc(*'mp4v'), 30, (processed_frames[0].shape[1], processed_frames[0].shape[0])) - for frame in processed_frames: - out.write(frame) - out.release() - - return processed_video_path - - - def button_callback(update: Update, context: CallbackContext) -> None: query = update.callback_query query.answer() @@ -185,8 +221,8 @@ def main() -> None: dispatcher = updater.dispatcher dispatcher.add_handler(CommandHandler("start", start)) - dispatcher.add_handler(MessageHandler(Filters.all & Filters.private, process_media)) dispatcher.add_handler(CommandHandler("pixel", pixelate_command)) + dispatcher.add_handler(MessageHandler(Filters.photo & Filters.private, pixelate_faces)) dispatcher.add_handler(CallbackQueryHandler(button_callback)) updater.start_polling()