Recall

最近 B 站刷到某 不要脸 大V团队盗用越南开发者的视频转字符的脚本,并且特意将署名删掉的视频。其实这已经不是第一次了,当时就在想,如果我自己来做,能否成功呢?

但在打开文件夹时,发现一年前就已经写过一个版本,而且已经传上至-->  github  <-- 。但现在来看写得惨不忍睹(不是)

 

效果展示show

再次运行下,我们先看下效果:

 

使用说明

step 1

 

创建虚拟环境:

启动虚拟环境:

win:

.\venv_win\Scripts\activate

macOS:

source ./venv_win/bin/activate

step 2

如果你不需要 UI, 直接运行:

 pip install -r requirements.txt

如果你需要 UI,请额外安装 pyside6 :

step 3 (optional)

install ffmpeg 下载地址 ffmpeg 下载之后将 ffmpeg.exe, ffprobe.exe (macOS则是:ffmpeg, ffprobe, 或通过homebrew安装)放到本项目根目录下。如果你已经在使用ffmpeg并且已经添加到 path,可跳过此步骤

step 4

运行:

python video_to_char.py --VIDEO "test.MP4"

查看其它可用参数:

python video_to_char.py -h

核心代码:

此处是核心部分【video_to_char.py】其余包装部分请移步至-->  github  <--

import os
import argparse
import subprocess
import multiprocessing
from pathlib import Path

import psutil
from PIL import Image
from PIL import ImageFont, ImageDraw
from typing import NewType, Union, Tuple, List

FRAME = NewType("FRAME", Image)

ascii_char = list('''$@B%8&WM#*oahkbdpqwmZO0QLCJUYXzcvunxrjft/\|()1{}[]?-_+~<>i!lI;:,\"^`'. ''')


def get_char(r, g, b, alpha=256) -> str:
    """
    BT.709 标准 https://www.itu.int/dms_pubrec/itu-r/rec/bt/R-REC-BT.709-6-201506-I!!PDF-E.pdf 
    page 4, item 3.2

    """
    if alpha == 0:
        return ' '
    length = len(ascii_char)
    gray = int(0.2126 * r + 0.7152 * g + 0.0722 * b)  

    unit = (256.0 + 1)/length
    return ascii_char[int(gray/unit)]


def draw_text(text: str,
              text_color: Union[str, tuple],
              text_size: int,
              frame: FRAME,
              location: tuple, font: str = "w6.ttf") -> None:
    font = ImageFont.truetype(font, text_size)
    draw = ImageDraw.Draw(frame)
    draw.text(location, text, fill=text_color, font=font)


def pixelate_image_info(frame: FRAME, block_size: int) -> Tuple[FRAME, list]:
    # 确保块大小是整数且大于0
    block_size = int(block_size)
    if block_size <= 0:
        raise ValueError("block_size must be a positive integer")

    # 调整图片大小,使其成为块的整数倍
    width, height = frame.size
    new_width = int(width / block_size) * block_size
    new_height = int(height / block_size) * block_size
    frame = frame.resize((new_width, new_height))

    # 将图片分割成块
    coordinate_colors = list()
    pixels = frame.load()
    for i in range(0, new_width, block_size):
        for j in range(0, new_height, block_size):
            block = frame.crop((i, j, i + block_size, j + block_size))

            # 计算块的平均颜色
            avg_color = block.resize((1, 1)).getpixel((0, 0))
            # # (color, x, y)
            coordinate_colors.append((avg_color, i, j))

            # # 用平均颜色填充整个块
            for x in range(i, i + block_size):
                for y in range(j, j + block_size):
                    pixels[x, y] = avg_color

    return frame, coordinate_colors


def transfer_to_text(frame_src: str,
                     out_picture: str,
                     block_size: int = 10,
                     bg_color: str = "white",
                     text_color: str = "auto",
                     width: int = 0,
                     height: int = 0,
                     mosaic: bool = False,
                     text_size: int = 5) -> None:

    im = Image.open(frame_src)

    pixelate_image = pixelate_image_info(im, block_size)[0] if mosaic else im

    original_width = pixelate_image.width
    original_height = pixelate_image.height
    default_resized_width = 100

    actual_resized_width = default_resized_width if 0 in (width, height) else width
    actual_resized_height = int(default_resized_width * original_height / original_width) if 0 in (width, height) else height

    pixelate_image = pixelate_image.resize((actual_resized_width, actual_resized_height), Image.NEAREST)
    new_width, new_height = pixelate_image.width, pixelate_image.height

    # pixelate_image = np.array(pixelate_image)
    infos = list()
    for i in range(actual_resized_height):
        for j in range(actual_resized_width):
            pixel_color = pixelate_image.getpixel((j, i))
            text = get_char(*pixel_color)
            # text = get_char(*pixelate_image[j, i])
            infos.append(((i, j), text, pixel_color))

    new_image = Image.new("RGB", (new_width * text_size, new_height * text_size), color=bg_color)
    for info in infos:
        coordinate, text, color = info
        _color = color if text_color == "auto" else text_color
        draw_text(text=text, text_color=_color, text_size=text_size, frame=new_image, location=(coordinate[1] * text_size, coordinate[0] * text_size))
    new_image.save(f"{out_picture}")


class WorkerProcess(multiprocessing.Process):

    def __init__(self,
                 image_queue: multiprocessing.Queue,
                 input_folder: str,
                 output_folder: str,
                 text_color: str = "auto",
                 bg_color: str="white",
                 mosaic: bool = False):

        super().__init__()
        self.image_queue = image_queue
        self.input_folder = input_folder
        self.output_folder = output_folder
        self.text_color = text_color
        self.mosaic = mosaic
        self.bg_color = bg_color

    def run(self):
        while True:
            # 从队列中获取图片路径
            image_path = self.image_queue.get()
            # 如果队列为空,则结束循环
            if image_path is None:
                break
            # 处理图片
            transfer_to_text(frame_src=f"{self.input_folder}/{image_path}",
                             out_picture=f"{self.output_folder}/out_{image_path}",
                             text_size=10,
                             block_size=20,
                             text_color=self.text_color,
                             bg_color=self.bg_color,
                             mosaic=self.mosaic)
            print(f"{self.input_folder}/{image_path} 处理完成! -----> {self.output_folder}/out_{image_path}")


def frame_transfer_multiprocessor(input_folder: str,
                                  output_folder: str,
                                  process_num: int = 10,
                                  text_color: str = "auto",
                                  bg_color="white",
                                  mosaic: bool = False) -> List[multiprocessing.Process]:
    # 创建一个队列
    image_queue = multiprocessing.Queue()
    for frame in sorted([file for file in os.listdir(input_folder) if file.endswith("jpg")]):
        image_queue.put(frame)

    # 创建并启动进程
    num_worker_processes = process_num
    transfer_processes = []
    for i in range(num_worker_processes):
        _process = WorkerProcess(image_queue, input_folder, output_folder, text_color=text_color, bg_color=bg_color, mosaic=mosaic)
        _process.start()
        transfer_processes.append(_process)

    for i in range(num_worker_processes):
        image_queue.put(None)

    for _process in transfer_processes:
        _process.join()

    print("All images have been processed.")
    return transfer_processes


if __name__ in "__main__":
    parser = argparse.ArgumentParser(description="Transfer a normal video into ASCII one")

    parser.add_argument("--VIDEO", type=str, help="Path to the video file")
    parser.add_argument("--TEXT_COLOR", type=str, default="auto",
                        help="Color of the text overlay. If 'auto', it will be automatically calculated.")
    parser.add_argument("--BG_COLOR", type=str, default="white",
                        help="Background color")
    parser.add_argument("--MOSAIC", type=str, help="Whether to apply mosaic effect", default="no")
    parser.add_argument("--PROCESS_NUM", type=int, default=10, help="Number of processes to use")
    parser.add_argument("--DELETE_FRAMES_AFTER_PROCESSED", type=str, default="no",
                        help="Whether to delete frames after processing")

    # parse args
    args = parser.parse_args()
    video_path = Path(args.VIDEO).resolve()
    VIDEO = f"{video_path}"
    TEXT_COLOR = args.TEXT_COLOR  # 如果是auto,则自动计算颜色,提取自原视频
    BG_COLOR = args.BG_COLOR
    MOSAIC = True if args.MOSAIC == "yes" else False
    PROCESS_NUM = args.PROCESS_NUM
    DELETE_FRAMES_AFTER_PROCESSED = True if args.DELETE_FRAMES_AFTER_PROCESSED == "yes" else False



    installed_at = Path(__file__).resolve().parent
    print(f"install {installed_at}-------------------{VIDEO}-----------------{psutil.WINDOWS}")

    # 预设原视频帧文件夹/处理帧文件夹
    tmp_frames_folder = f"{installed_at}/tmp_frames"
    out_frames_folder = f"{installed_at}/out_frames"
    if not os.path.exists(tmp_frames_folder):
        os.mkdir(tmp_frames_folder)
    if not os.path.exists(out_frames_folder):
        os.mkdir(out_frames_folder)


    # 获取视频帧率
    ffprobe_path = f"{installed_at}/ffprobe.exe" if psutil.WINDOWS else f"{installed_at}/ffprobe"
    command_get_rate = f"{ffprobe_path} -v error -select_streams v:0 -show_entries stream=r_frame_rate -of default=noprint_wrappers=1:nokey=1 {VIDEO}"

    process = subprocess.Popen(command_get_rate, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    output, error = process.communicate()

    rate, sec = output.decode('utf-8').split("/")
    VIDEO_RATE = int(int(rate) / int(sec))
    print(VIDEO_RATE, psutil.WINDOWS)


    #视频处理
    ffmpeg_path = f"{installed_at}/ffmpeg.exe" if psutil.WINDOWS else f"{installed_at}/ffmpeg"

    os.system(f"{ffmpeg_path} -i {VIDEO} -qscale:v 1 -qmin 1 -qmax 1 -vsync 0 {installed_at}/tmp_frames/frame%08d.jpg")
    processors = frame_transfer_multiprocessor(input_folder=tmp_frames_folder,
                                               output_folder=out_frames_folder,
                                               process_num=PROCESS_NUM,
                                               text_color=TEXT_COLOR,
                                               bg_color=BG_COLOR,
                                               mosaic=MOSAIC)


    command_rebuild_video = f"{ffmpeg_path} -r {VIDEO_RATE} -i {installed_at}/out_frames/out_frame%08d.jpg -i {VIDEO} -map 0:v:0 -map 1:a:0 -c:a copy -c:v libx264 -pix_fmt yuv420p {video_path.parent}/out_{'mosaic' if MOSAIC else ''}_{video_path.name}"
    # os.system()
    rebuild_process = subprocess.Popen(command_rebuild_video, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    output, error = rebuild_process.communicate()
    print(f"Process Finished... result:{output}, err: {error}")
    if DELETE_FRAMES_AFTER_PROCESSED:
        for tmp_frame in [f"{tmp_frames_folder}/{file}" for file in os.listdir(tmp_frames_folder)]:
            os.remove(tmp_frame)

        for out_frame in [f"{out_frames_folder}/{file}" for file in os.listdir(out_frames_folder)]:
            os.remove(out_frame)

 

To Do:

自动检测系统、 ffmpeg/ffmpeg 安装情况、