Advanced replay buffer

upgradeQ

Member
Hello guys, I've found how you can make OBS pipe it's recording output into Python/Go/C etc. on Windows.
This is allows you to implement in RAM recordings or instant replay fully in RAM.
Media source has pipe interaction ability as well.

Unlike the Replay Source, this method allows for HEVC encoding.

I've wrote a Python server to showcase basic functionality:
1) Stream buffer to OBS media source, as is - you can't rewind back or forward.
2) Open buffer in mpv, so you can seek assuming you have large enough buffer setting in mpv.

To run it we need special mode and custom settings:
First, go to Settings > Output > Output Mode set Advanced > Recording tab > set type Custom Ouput (FFMPEG)
FFMPEG Ouput type set to Output to URL > File path or URL > \\.\pipe\obs_piping_to_buffer
Container format > mpegts (VERY IMPORTANT)

Video bitrate > 50000 Kbps

Check > Show all codecs(even if potentially incompatible)
Select > hevc_nvenc or amd or intel variant

Video encoder settings > similar to advanced from normal mode.
preset=p7 tune=hq profile=main multipass=2 rc-lookahead=32 spatial-aq=1 temporal-aq=1 bf=4 b_ref_mode=middle g=120 rc=vbr b=50M maxrate=60M bufsize=120M

Now, you can open the buffer in mpv with this command: .\mpv.exe http://localhost:1688/video
Or use sceneitem media source with this address \\.\pipe\piping_to_media_source and launch via powershell Invoke-RestMethod -Uri "http://localhost:1688/play"

Here is the full script, you can advance it further, like adding multiple replays, in memory ffmpeg conversions, rewrite in non GC language...
Python:
import ctypes
import threading
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from contextlib import contextmanager

MB = 200
RB_CHUNK = 1024 * 1024
PIPE_IN = r"\\.\pipe\obs_piping_to_buffer"
PIPE_OUT = r"\\.\pipe\piping_to_media_source"

KERNEL32 = ctypes.windll.kernel32
BUFFER_SIZE = 65536
PIPE_ACCESS_INBOUND = 1
PIPE_ACCESS_OUTBOUND = 2
PIPE_TYPE_BYTE = 0
INVALID_HANDLE = -1
ERROR_PIPE_CONNECTED = 535


@contextmanager
def pipe_server(name, mode):
    h = KERNEL32.CreateNamedPipeW(
        name, mode, PIPE_TYPE_BYTE, 1, RB_CHUNK, RB_CHUNK, 0, None
    )
    if h == INVALID_HANDLE:
        return
    try:
        res = KERNEL32.ConnectNamedPipe(h, None)
        if res != 0 or KERNEL32.GetLastError() == ERROR_PIPE_CONNECTED:
            yield h
    finally:
        KERNEL32.FlushFileBuffers(h)
        KERNEL32.DisconnectNamedPipe(h)
        KERNEL32.CloseHandle(h)


class RingBuffer:
    def __init__(self, capacity):
        self.data = bytearray(capacity)
        self.head = 0
        self.size = 0
        self.frozen = False
        self.lock = threading.Lock()
        self.play_event = threading.Event()

    def write(self, source_buf, length):
        with self.lock:
            if self.frozen:
                return

            cap = len(self.data)
            length = min(length, cap)
            src = memoryview(source_buf)[:length]

            remain = cap - self.head
            if length <= remain:
                self.data[self.head : self.head + length] = src
            else:
                self.data[self.head :] = src[:remain]
                self.data[: length - remain] = src[remain:]

            self.head = (self.head + length) % cap
            self.size = min(self.size + length, cap)

    def get_chunks(self):
        with self.lock:
            if self.size == 0:
                return []
            self.frozen = True
            mv = memoryview(self.data)
            if self.size < len(self.data):
                return [mv[: self.head]]
            return [mv[self.head :], mv[: self.head]]

    def release(self):
        with self.lock:
            self.head = 0
            self.size = 0
            self.frozen = False


G = RingBuffer(MB * 1024 * 1024)


def recorder():
    buf = ctypes.create_string_buffer(RB_CHUNK)
    while True:
        with pipe_server(PIPE_IN, PIPE_ACCESS_INBOUND) as h:
            n = ctypes.c_uint32()
            while (
                KERNEL32.ReadFile(h, buf, RB_CHUNK, ctypes.byref(n), None)
                and n.value > 0
            ):
                G.write(buf, n.value)
        time.sleep(0.1)


def sender_worker():
    while True:
        G.play_event.wait()
        G.play_event.clear()
        chunks = G.get_chunks()
        if chunks:
            with pipe_server(PIPE_OUT, PIPE_ACCESS_OUTBOUND) as h:
                n = ctypes.c_uint32()
                for chunk in chunks:
                    for i in range(0, len(chunk), BUFFER_SIZE):
                        part = chunk[i : i + BUFFER_SIZE]
                        KERNEL32.WriteFile(
                            h,
                            (ctypes.c_char * len(part)).from_buffer(part),
                            len(part),
                            ctypes.byref(n),
                            None,
                        )
        G.release()


class ReplayHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == "/play":
            G.play_event.set()
            self.send_response(200)
            self.end_headers()
        elif self.path == "/video":
            chunks = G.get_chunks()
            if not chunks:
                self.send_error(503)
                G.release()
                return

            self.send_response(200)
            self.send_header("Content-Type", "video/mp2t")
            self.end_headers()
            try:
                for chunk in chunks:
                    self.wfile.write(chunk)
            except:
                pass
            finally:
                G.release()
        else:
            self.send_error(404)


if __name__ == "__main__":
    threading.Thread(target=recorder, daemon=True).start()
    threading.Thread(target=sender_worker, daemon=True).start()
    print(f"Replay buffer: {MB}MB")
    print("Stream URL: http://localhost:1688/video")
    HTTPServer(("0.0.0.0", 1688), ReplayHandler).serve_forever()
 
Back
Top