Python Scripting - Pulling audio source information

someducky

New Member
Hi!

I've searched around in the guides, forum, and documentation for this, with no luck.
I'm trying to pull information (the decibel level, to be precise) for a specific audio source, like a microphone for example.
I know there's a flag for when the source is muted, but that's all I can find.

Is it possible to get more information about an audio from the script API at all? And if so how?
I've been dead stuck on this for a while, so any help would be much appreciated.
Thanks in advance.
 

SirEdric

New Member
You're not the only one looking for a solution....:-/
 

upgradeQ

Member
The first method is to use move transition plugin by @Exeldro which can redirect audio volume levels that volmeter receives to any source, e.g custom Lua source with just one setting, then read that setting via Python.
The second method is to use ctypes wrapper, however it requires additional step on GNU/Linux.
Here is what I came up with (MIT Licence) :


Python:
import obspython as S  # studio
from types import SimpleNamespace
from ctypes import *
from ctypes.util import find_library

obsffi = CDLL(find_library("obs"))
G = SimpleNamespace()


def wrap(funcname, restype, argtypes):
    """Simplify wrapping ctypes functions in obsffi"""
    func = getattr(obsffi, funcname)
    func.restype = restype
    func.argtypes = argtypes
    globals()["g_" + funcname] = func


class Source(Structure):
    pass


class Volmeter(Structure):
    pass


volmeter_callback_t = CFUNCTYPE(
    None, c_void_p, POINTER(c_float), POINTER(c_float), POINTER(c_float)
)
wrap("obs_get_source_by_name", POINTER(Source), argtypes=[c_char_p])
wrap("obs_source_release", None, argtypes=[POINTER(Source)])
wrap("obs_volmeter_create", POINTER(Volmeter), argtypes=[c_int])
wrap("obs_volmeter_destroy", None, argtypes=[POINTER(Volmeter)])
wrap(
    "obs_volmeter_add_callback",
    None,
    argtypes=[POINTER(Volmeter), volmeter_callback_t, c_void_p],
)
wrap(
    "obs_volmeter_remove_callback",
    None,
    argtypes=[POINTER(Volmeter), volmeter_callback_t, c_void_p],
)
wrap(
    "obs_volmeter_attach_source",
    c_bool,
    argtypes=[POINTER(Volmeter), POINTER(Source)],
)


@volmeter_callback_t
def volmeter_callback(data, mag, peak, input):
    G.noise = float(peak[0])


def output_to_file(volume):
    with open("current_db_volume_of_source_status.txt", "w", encoding="utf-8") as f:
        f.write(str(volume))


OBS_FADER_LOG = 2
G.lock = False
G.start_delay = 3
G.duration = 0
G.noise = 999
G.tick = 16
G.tick_mili = G.tick * 0.001
G.interval_sec = 0.05
G.tick_acc = 0
G.source_name = "Media Source"
G.volmeter = "not yet initialized volmeter instance"
G.callback = output_to_file


def event_loop():
    """wait n seconds, then execute callback with db volume level within interval"""
    if G.duration > G.start_delay:
        if not G.lock:
            print("setting volmeter")
            source = g_obs_get_source_by_name(G.source_name.encode("utf-8"))
            G.volmeter = g_obs_volmeter_create(OBS_FADER_LOG)
            g_obs_volmeter_add_callback(G.volmeter, volmeter_callback, None)
            if g_obs_volmeter_attach_source(G.volmeter, source):
                g_obs_source_release(source)
                G.lock = True
                print("Attached to source")
                return
        G.tick_acc += G.tick_mili
        if G.tick_acc > G.interval_sec:
            G.callback(G.noise)
            G.tick_acc = 0
    else:
        G.duration += G.tick_mili


def script_unload():
    g_obs_volmeter_remove_callback(G.volmeter, volmeter_callback, None)
    g_obs_volmeter_destroy(G.volmeter)
    print("Removed volmeter & volmeter_callback")


S.timer_add(event_loop, G.tick)
 

PineapplePro

New Member
Is There any more information for this? I am running this on windows and cannot get the find ctypes for obs to load on line 6. It only gives me a none error.

Im familiar with python but not ctypes.........

I am trying to make a Physical volume meter with adjustment knob for active audio source with clipping info as well

Any help would be appreciated
 

PineapplePro

New Member
Epic @upgradeQ that worked! and i was able to set to the proper audio source ! now i just need to add a dropdown to set source on the fly maybe file output textbox and add output to the led strip /knob console ill keep the thread updated with any new code additions / hardware unless i should create a new thread
 

PineapplePro

New Member
So Ive Gotten It Working so far !! Here is The code:

Python:
import obspython as S  # studio
from types import SimpleNamespace
from ctypes import *
from ctypes.util import find_library
import serial
import serial.tools.list_ports

###############
#OS Setup
###############
obsffi = CDLL("obs")#windows
#obsffi = CDLL(find_library("obs"))#Linux/Mac
print("Script OK!")

###############
#Serial Devices Search
###############
print('Checking Com Ports.......')
ports = serial.tools.list_ports.comports(include_links=False)
ser = ""
for port in ports :
    print('Found port: '+ port.device+ " !")

###############
#Create Global Namespace
###############
G = SimpleNamespace()
stop_loop = False

def wrap(funcname, restype, argtypes):
    """Simplify wrapping ctypes functions in obsffi"""
    func = getattr(obsffi, funcname)
    func.restype = restype
    func.argtypes = argtypes
    globals()["g_" + funcname] = func

class Source(Structure):
    pass

class Volmeter(Structure):
    pass

volmeter_callback_t = CFUNCTYPE(
    None, c_void_p, POINTER(c_float), POINTER(c_float), POINTER(c_float)
)
wrap("obs_get_source_by_name", POINTER(Source), argtypes=[c_char_p])
wrap("obs_source_release", None, argtypes=[POINTER(Source)])
wrap("obs_volmeter_create", POINTER(Volmeter), argtypes=[c_int])
wrap("obs_volmeter_destroy", None, argtypes=[POINTER(Volmeter)])
wrap(
    "obs_volmeter_add_callback",
    None,
    argtypes=[POINTER(Volmeter), volmeter_callback_t, c_void_p],
)
wrap(
    "obs_volmeter_remove_callback",
    None,
    argtypes=[POINTER(Volmeter), volmeter_callback_t, c_void_p],
)
wrap(
    "obs_volmeter_attach_source",
    c_bool,
    argtypes=[POINTER(Volmeter), POINTER(Source)],
)
 

@volmeter_callback_t
def volmeter_callback(data, mag, peak, input):
    G.noise = float(peak[0])

###############
#Serial Output
###############
def output_to_file(volume):
    vol = str(volume)
    if(vol == "-inf"):
        volume = -999
 
    if float(volume) >= -0.09:
        ser.write((str((1))+"x\0").encode())
        return 0

    ser.write((str((volume))+"x\0").encode())
    return 0
 
 

    #with open("c:/current_db_volume_of_source_status.txt", "w", encoding="utf-8") as f:
    #        f.write(vol)
           
    #        ser.write((str(int(vol)) + "\r\n").encode())
         
            #if float(vol) >= 0:
            #    print("!---------------clip------------------!")

###############
#Python Global Variables
###############
     
source_name = ""
device_name = ""
OBS_FADER_LOG = 2
G.lock = False
G.start_delay = 3
G.duration = 0
G.noise = 999
G.tick = 10
G.tick_mili = G.tick * 0.001
G.interval_sec = 0.025
G.tick_acc = 0
G.source_name = source_name
G.volmeter = "not yet initialized volmeter instance"
G.callback = output_to_file

###############
#Update Audio Device Selection
###############
def update_source():
 
    global interval
    global source_name
    #print(source_name)

    if source_name is not None:
        G.source_name = source_name
        #G.source_name = S.obs_data_get_string(source,"name")
        #print(G.source_name)

        S.timer_add(event_loop, G.tick)
        global stop_loop
        stop_loop = False

###############
#Button Call / Initiate Run Sequence
###############
def refresh_pressed(props, prop):
    global ser
    print("Starting Output")
    ser = serial.Serial(device_name)
    if ser.isOpen():
        ser.close()

    ser = serial.Serial(device_name, 9600, timeout=1)
    ser.flushInput()
    ser.flushOutput()
    print('Connecting to: ' + ser.name)
 
    update_source()

###############
#Settings Menu Description
###############
def script_description():
    return "Volume DB to Com Port Output .\n\n John Leger & Various"

###############
#Global Variable Update
###############
def script_update(settings):
    global source_name
    global device_name

    interval    = S.obs_data_get_int(settings, "interval")
    source_name = S.obs_data_get_string(settings, "source")
    device_name = S.obs_data_get_string(settings, "device")
 

def script_defaults(settings):
    S.obs_data_set_default_int(settings, "interval", 30)

###############
#Settings Layout and Functions
###############
def script_properties():
    props = S.obs_properties_create()

    p = S.obs_properties_add_list(props, "source", "Audio Source", S.OBS_COMBO_TYPE_EDITABLE, S.OBS_COMBO_FORMAT_STRING)
    sources = S.obs_enum_sources()
    portlist = ""
    if sources is not None:
        for source in sources:
            source_id = S.obs_source_get_unversioned_id(source)
            #print(source_id)
            if source_id == "asio_input_capture" or source_id == "wasapi_input_capture" or source_id == "wasapi_output_capture":
                name = S.obs_source_get_name(source)
             
                S.obs_property_list_add_string(p, name, name)

        S.source_list_release(sources)
 
    com = S.obs_properties_add_list(props, "device", "COM PORT", S.OBS_COMBO_TYPE_EDITABLE, S.OBS_COMBO_FORMAT_STRING)
    ports = serial.tools.list_ports.comports(include_links=False)
    for port in ports :
        port.device
     
        S.obs_property_list_add_string(com,port.device,port.device)
    S.obs_properties_add_button(props, "button", "Connect", refresh_pressed)
    S.obs_properties_add_button(props, "button2", "Stop", script_unload_button)
    return props


###############
#Main Loop runs after Connect Button Is Pressed
###############
def event_loop():
    """wait n seconds, then execute callback with db volume level within interval"""
    if stop_loop == True:
        return
    if G.duration > G.start_delay:
        if not G.lock:
            print("Setting up Meter Instance")
            source = g_obs_get_source_by_name(G.source_name.encode("utf-8"))
            G.volmeter = g_obs_volmeter_create(OBS_FADER_LOG)
            g_obs_volmeter_add_callback(G.volmeter, volmeter_callback, None)
            if g_obs_volmeter_attach_source(G.volmeter, source):
                g_obs_source_release(source)
                G.lock = True
                print("Attached to source:" + G.source_name + "!")
                return
        G.tick_acc += G.tick_mili
        if G.tick_acc > G.interval_sec:
            G.callback(G.noise)
            G.tick_acc = 0
    else:
        G.duration += G.tick_mili

###############
#Restart / Kill Script
###############
def script_unload():
    global stop_loop
    stop_loop = True
    g_obs_volmeter_remove_callback(G.volmeter, volmeter_callback, None)
    g_obs_volmeter_destroy(G.volmeter)
    if ser.isOpen():
        ser.flushInput()
        ser.flushOutput()
        ser.close()
    print("Removed volmeter & volmeter_callback")

def script_unload_button(pad,pad2):
    global stop_loop
    stop_loop = True
    g_obs_volmeter_remove_callback(G.volmeter, volmeter_callback, None)
    g_obs_volmeter_destroy(G.volmeter)
    if ser.isOpen():
        ser.flushInput()
        ser.flushOutput()
        ser.close()
    print("Removed volmeter & volmeter_callback")

Arduino Code: uses 8 Neopixels on 1 Pin with USB Serial Input

C:
#include <Adafruit_NeoPixel.h>
#ifdef __AVR__
#include <avr/power.h> // Required for 16 MHz Adafruit Trinket
#endif

// Which pin on the Arduino is connected to the NeoPixels?
#define PIN        6 // On Trinket or Gemma, suggest changing this to 1

// How many NeoPixels are attached to the Arduino?
#define NUMPIXELS 8 // Popular NeoPixel ring size

// Create Neopixel Reference 
Adafruit_NeoPixel pixels(NUMPIXELS, PIN, NEO_GRB + NEO_KHZ800);

String inString = String(100);
int dbLevel = -999;
#define DELAYVAL 1 // Time (in milliseconds) to pause between pixels

void buffer_fill() { //Fill Serial Buffer
  inString = "";
  inString = Serial.readStringUntil('x');
  if (inString != "") {
    //Serial.print("Recieved:");
    //Serial.println(inString);//Acknowledge Input
    pixels.clear(); // Set all pixel colors to 'off'
    // The first NeoPixel in a strand is #0, second is 1, all the way up
    // to the count of pixels minus one.
    float fled = inString.toFloat();
    //int ledLvl = map(inString.toFloat(), -70, -0.3, -1, 7);
    //Serial.println(ledLvl);
    // pixels.Color() takes RGB values, from 0,0,0 up to 255,255,255

    if (fled >= 0.09) {
      for (int x = 0; x < 8; x++) {
        pixels.setPixelColor(x, pixels.Color(110, 0, 0));
  }
      pixels.show();  // Send the updated pixel colors to the hardware and delay to show Clip.
      Serial.flush();
      delay(750);
    }
    
    if (fled < -60) {
      for (int x = 0; x < 7; x++) {
        pixels.setPixelColor(x, pixels.Color(0, 0, 0));
      }
    }

    if (fled > -60 && fled < -50) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      for (int x = 1; x < 7; x++) {
        pixels.setPixelColor(x, pixels.Color(0, 0, 0));
      }
    }

    if (fled > -50 && fled < -40) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      pixels.setPixelColor(1, pixels.Color(0, 90, 0));
      for (int x = 2; x < 7; x++) {
        pixels.setPixelColor(x, pixels.Color(0, 0, 0));
      }
    }
    
    if (fled > -40 && fled < -30) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      pixels.setPixelColor(1, pixels.Color(0, 90, 0));
      pixels.setPixelColor(2, pixels.Color(0, 90, 0));
      for (int x = 3; x < 7; x++) {
        pixels.setPixelColor(x, pixels.Color(0, 0, 0));
      }
    }
    
    if (fled > -30 && fled < -20) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      pixels.setPixelColor(1, pixels.Color(0, 90, 0));
      pixels.setPixelColor(2, pixels.Color(0, 90, 0));
      pixels.setPixelColor(3, pixels.Color(0, 90, 0));
      for (int x = 4; x < 7; x++) {
        pixels.setPixelColor(x, pixels.Color(0, 0, 0));
      }
    }
    
    if (fled > -20 && fled < -13) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      pixels.setPixelColor(1, pixels.Color(0, 90, 0));
      pixels.setPixelColor(2, pixels.Color(0, 90, 0));
      pixels.setPixelColor(3, pixels.Color(0, 90, 0));
      pixels.setPixelColor(4, pixels.Color(0, 90, 0));
      for (int x = 5; x < 7; x++) {
        pixels.setPixelColor(x, pixels.Color(0, 0, 0));
      }
    }

    if (fled > -13 && fled < -7) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      pixels.setPixelColor(1, pixels.Color(0, 90, 0));
      pixels.setPixelColor(2, pixels.Color(0, 90, 0));
      pixels.setPixelColor(3, pixels.Color(0, 90, 0));
      pixels.setPixelColor(4, pixels.Color(0, 90, 0));
      pixels.setPixelColor(5, pixels.Color(90, 90, 0));
      pixels.setPixelColor(6, pixels.Color(0, 0, 0));
      pixels.setPixelColor(7, pixels.Color(0, 0, 0));
    }

    if (fled > -7 && fled < -3) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      pixels.setPixelColor(1, pixels.Color(0, 90, 0));
      pixels.setPixelColor(2, pixels.Color(0, 90, 0));
      pixels.setPixelColor(3, pixels.Color(0, 90, 0));
      pixels.setPixelColor(4, pixels.Color(0, 90, 0));
      pixels.setPixelColor(5, pixels.Color(100, 90, 0));
      pixels.setPixelColor(6, pixels.Color(100, 90, 0));
      pixels.setPixelColor(7, pixels.Color(0, 0, 0));
    }

    if (fled > -3 && fled < -0.09) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      pixels.setPixelColor(1, pixels.Color(0, 90, 0));
      pixels.setPixelColor(2, pixels.Color(0, 90, 0));
      pixels.setPixelColor(3, pixels.Color(0, 90, 0));
      pixels.setPixelColor(4, pixels.Color(0, 90, 0));
      pixels.setPixelColor(5, pixels.Color(100, 90, 0));
      pixels.setPixelColor(6, pixels.Color(100, 90, 0));
      pixels.setPixelColor(7, pixels.Color(95, 0, 0));
    }
    pixels.show();   // Send the updated pixel colors to the hardware.
    Serial.flush();  // Clear Serial.
  } else {

  }
}

void setup() {
  // Initialize initial values.
  Serial.begin(9600);
  pixels.begin(); // INITIALIZE NeoPixel strip object
  pixels.setBrightness(64);
  Serial.setTimeout(33.3);//30hz
}

void loop() {
  //Handle USB communication
  // USBMIDI.poll();
  buffer_fill();
  delay(1);
}
 
Last edited:

PineapplePro

New Member
Updated a Few Things For Non Encoder Blocking Timing and Better Serial Functionality! Here is The code:

Python:
import obspython as S  # studio
from types import SimpleNamespace
from ctypes import *
from ctypes.util import find_library
import serial
import serial.tools.list_ports

###############
#OS Setup
###############
obsffi = CDLL("obs")#windows
#obsffi = CDLL(find_library("obs"))#Linux/Mac
print("Script OK!")

###############
#Serial Devices Search
###############
print('Checking Com Ports.......')
ports = serial.tools.list_ports.comports(include_links=False)
ser = ""
for port in ports :
    print('Found port: '+ port.device+ " !")

###############
#Create Global Namespace
###############
G = SimpleNamespace()
stop_loop = False

def wrap(funcname, restype, argtypes):
    """Simplify wrapping ctypes functions in obsffi"""
    func = getattr(obsffi, funcname)
    func.restype = restype
    func.argtypes = argtypes
    globals()["g_" + funcname] = func

class Source(Structure):
    pass

class Volmeter(Structure):
    pass

volmeter_callback_t = CFUNCTYPE(
    None, c_void_p, POINTER(c_float), POINTER(c_float), POINTER(c_float)
)
wrap("obs_get_source_by_name", POINTER(Source), argtypes=[c_char_p])
wrap("obs_source_release", None, argtypes=[POINTER(Source)])
wrap("obs_volmeter_create", POINTER(Volmeter), argtypes=[c_int])
wrap("obs_volmeter_destroy", None, argtypes=[POINTER(Volmeter)])
wrap(
    "obs_volmeter_add_callback",
    None,
    argtypes=[POINTER(Volmeter), volmeter_callback_t, c_void_p],
)
wrap(
    "obs_volmeter_remove_callback",
    None,
    argtypes=[POINTER(Volmeter), volmeter_callback_t, c_void_p],
)
wrap(
    "obs_volmeter_attach_source",
    c_bool,
    argtypes=[POINTER(Volmeter), POINTER(Source)],
)
    

@volmeter_callback_t
def volmeter_callback(data, mag, peak, input):
    G.noise = float(peak[0])

###############
#Serial Output
###############
def output_to_file(volume):
    vol = str(volume)
    if(vol == "-inf"):
        volume = -999
    
    if float(volume) >= -0.09:
        ser.write((str((1))+"\n\0").encode())
        return 0

    ser.write((str((volume))+"\n\0").encode())
    return 0
    
    

    #with open("c:/current_db_volume_of_source_status.txt", "w", encoding="utf-8") as f:
    #        f.write(vol)
              
    #        ser.write((str(int(vol)) + "\r\n").encode())
            
            #if float(vol) >= 0:
            #    print("!---------------clip------------------!")

###############
#Python Global Variables
###############
        
source_name = ""
device_name = ""
OBS_FADER_LOG = 2
G.lock = False
G.start_delay = 3
G.duration = 0
G.noise = 999
G.tick = 10
G.tick_mili = G.tick * 0.001
G.interval_sec = 0.025
G.tick_acc = 0
G.source_name = source_name
G.volmeter = "not yet initialized volmeter instance"
G.callback = output_to_file

###############
#Update Audio Device Selection
###############
def update_source():
    
    global interval
    global source_name
    #print(source_name)    

    if source_name is not None:
        G.source_name = source_name
        #G.source_name = S.obs_data_get_string(source,"name")
        #print(G.source_name)

        S.timer_add(event_loop, G.tick)
        global stop_loop
        stop_loop = False

###############
#Button Call / Initiate Run Sequence
###############
def refresh_pressed(props, prop):
    global ser
    print("Starting Output")
    ser = serial.Serial(device_name)
    if ser.isOpen():
        ser.close()

    ser = serial.Serial(device_name, 9600, timeout=1)
    ser.flushInput()
    ser.flushOutput()
    print('Connecting to: ' + ser.name)
    
    update_source()

###############
#Settings Menu Description
###############
def script_description():
    return "Volume DB to Com Port Output as Text String Click Refresh Script to Change/Search Com Ports or if any errors .\n\n John Leger, upgradeQ & Various"

###############
#Global Variable Update
###############
def script_update(settings):
    global source_name
    global device_name

    interval    = S.obs_data_get_int(settings, "interval")
    source_name = S.obs_data_get_string(settings, "source")
    device_name = S.obs_data_get_string(settings, "device")
    

def script_defaults(settings):
    S.obs_data_set_default_int(settings, "interval", 30)

###############
#Settings Layout and Functions
###############
def script_properties():
    props = S.obs_properties_create()

    p = S.obs_properties_add_list(props, "source", "Audio Source", S.OBS_COMBO_TYPE_EDITABLE, S.OBS_COMBO_FORMAT_STRING)
    sources = S.obs_enum_sources()
    portlist = ""
    if sources is not None:
        for source in sources:
            source_id = S.obs_source_get_unversioned_id(source)
            #print(source_id)
            if source_id == "asio_input_capture" or source_id == "wasapi_input_capture" or source_id == "wasapi_output_capture":
                name = S.obs_source_get_name(source)
                
                S.obs_property_list_add_string(p, name, name)

        S.source_list_release(sources)
    
    com = S.obs_properties_add_list(props, "device", "COM PORT", S.OBS_COMBO_TYPE_EDITABLE, S.OBS_COMBO_FORMAT_STRING)
    ports = serial.tools.list_ports.comports(include_links=False)
    for port in ports :
        port.device
        
        S.obs_property_list_add_string(com,port.device,port.device)
    S.obs_properties_add_button(props, "button", "Connect", refresh_pressed)
    S.obs_properties_add_button(props, "button2", "Stop", script_unload_button)
    return props


###############
#Main Loop runs after Connect Button Is Pressed
###############
def event_loop():
    """wait n seconds, then execute callback with db volume level within interval"""
    if stop_loop == True:
        return 
    if G.duration > G.start_delay:
        if not G.lock:
            print("Setting up Meter Instance")
            source = g_obs_get_source_by_name(G.source_name.encode("utf-8"))
            G.volmeter = g_obs_volmeter_create(OBS_FADER_LOG)
            g_obs_volmeter_add_callback(G.volmeter, volmeter_callback, None)
            if g_obs_volmeter_attach_source(G.volmeter, source):
                g_obs_source_release(source)
                G.lock = True
                print("Attached to source:" + G.source_name + "!")
                return
        G.tick_acc += G.tick_mili
        if G.tick_acc > G.interval_sec:
            G.callback(G.noise)
            G.tick_acc = 0
    else:
        G.duration += G.tick_mili

###############
#Restart / Kill Script
###############
def script_unload():
    global stop_loop
    stop_loop = True
    g_obs_volmeter_remove_callback(G.volmeter, volmeter_callback, None)
    g_obs_volmeter_destroy(G.volmeter)
    if ser.isOpen():
        ser.flushInput()
        ser.flushOutput()
        ser.close()
    print("Removed volmeter & volmeter_callback")

def script_unload_button(pad,pad2):
    global stop_loop
    stop_loop = True
    g_obs_volmeter_remove_callback(G.volmeter, volmeter_callback, None)
    g_obs_volmeter_destroy(G.volmeter)
    if ser.isOpen():
        ser.flushInput()
        ser.flushOutput()
        ser.close()
    print("Removed volmeter & volmeter_callback")

Arduino Code: uses 8 Neopixels on 1 Pin with USB Serial Input

C:
#include <Adafruit_NeoPixel.h>
#ifdef __AVR__
#include <avr/power.h> // Required for 16 MHz Adafruit Trinket
#endif

// Which pin on the Arduino is connected to the NeoPixels?
#define PIN        6 // On Trinket or Gemma, suggest changing this to 1

// How many NeoPixels are attached to the Arduino?
#define NUMPIXELS 8 // Popular NeoPixel ring size

// Create Neopixel Reference 
Adafruit_NeoPixel pixels(NUMPIXELS, PIN, NEO_GRB + NEO_KHZ800);

String inString = String(100);
int dbLevel = -999;
#define DELAYVAL 1 // Time (in milliseconds) to pause between pixels

void buffer_fill() { //Fill Serial Buffer
  inString = "";
  inString = Serial.readStringUntil('\n');
  if (inString != "") {
    //Serial.print("Recieved:");
    //Serial.println(inString);//Acknowledge Input
    pixels.clear(); // Set all pixel colors to 'off'
    // The first NeoPixel in a strand is #0, second is 1, all the way up
    // to the count of pixels minus one.
    float fled = inString.toFloat();
    //int ledLvl = map(inString.toFloat(), -70, -0.09, -1, 7);
    //Serial.println(ledLvl);
    // pixels.Color() takes RGB values, from 0,0,0 up to 255,255,255

    if (fled >= 0.09) {
      for (int x = 0; x < 8; x++) {
        pixels.setPixelColor(x, pixels.Color(110, 0, 0));
  }
      pixels.show();  // Send the updated pixel colors to the hardware and delay to show Clip.
      
      
    }
    
    if (fled < -60) {
      for (int x = 0; x < 7; x++) {
        pixels.setPixelColor(x, pixels.Color(0, 0, 0));
      }
    }

    if (fled > -60 && fled < -50) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      for (int x = 1; x < 7; x++) {
        pixels.setPixelColor(x, pixels.Color(0, 0, 0));
      }
    }

    if (fled > -50 && fled < -40) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      pixels.setPixelColor(1, pixels.Color(0, 90, 0));
      for (int x = 2; x < 7; x++) {
        pixels.setPixelColor(x, pixels.Color(0, 0, 0));
      }
    }
    
    if (fled > -40 && fled < -30) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      pixels.setPixelColor(1, pixels.Color(0, 90, 0));
      pixels.setPixelColor(2, pixels.Color(0, 90, 0));
      for (int x = 3; x < 7; x++) {
        pixels.setPixelColor(x, pixels.Color(0, 0, 0));
      }
    }
    
    if (fled > -30 && fled < -20) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      pixels.setPixelColor(1, pixels.Color(0, 90, 0));
      pixels.setPixelColor(2, pixels.Color(0, 90, 0));
      pixels.setPixelColor(3, pixels.Color(0, 90, 0));
      for (int x = 4; x < 7; x++) {
        pixels.setPixelColor(x, pixels.Color(0, 0, 0));
      }
    }
    
    if (fled > -20 && fled < -13) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      pixels.setPixelColor(1, pixels.Color(0, 90, 0));
      pixels.setPixelColor(2, pixels.Color(0, 90, 0));
      pixels.setPixelColor(3, pixels.Color(0, 90, 0));
      pixels.setPixelColor(4, pixels.Color(0, 90, 0));
      for (int x = 5; x < 7; x++) {
        pixels.setPixelColor(x, pixels.Color(0, 0, 0));
      }
    }

    if (fled > -13 && fled < -7) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      pixels.setPixelColor(1, pixels.Color(0, 90, 0));
      pixels.setPixelColor(2, pixels.Color(0, 90, 0));
      pixels.setPixelColor(3, pixels.Color(0, 90, 0));
      pixels.setPixelColor(4, pixels.Color(0, 90, 0));
      pixels.setPixelColor(5, pixels.Color(90, 90, 0));
      pixels.setPixelColor(6, pixels.Color(0, 0, 0));
      pixels.setPixelColor(7, pixels.Color(0, 0, 0));
    }

    if (fled > -7 && fled < -3) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      pixels.setPixelColor(1, pixels.Color(0, 90, 0));
      pixels.setPixelColor(2, pixels.Color(0, 90, 0));
      pixels.setPixelColor(3, pixels.Color(0, 90, 0));
      pixels.setPixelColor(4, pixels.Color(0, 90, 0));
      pixels.setPixelColor(5, pixels.Color(100, 90, 0));
      pixels.setPixelColor(6, pixels.Color(100, 90, 0));
      pixels.setPixelColor(7, pixels.Color(0, 0, 0));
    }

    if (fled > -3 && fled < -0.09) {
      pixels.setPixelColor(0, pixels.Color(0, 90, 0));
      pixels.setPixelColor(1, pixels.Color(0, 90, 0));
      pixels.setPixelColor(2, pixels.Color(0, 90, 0));
      pixels.setPixelColor(3, pixels.Color(0, 90, 0));
      pixels.setPixelColor(4, pixels.Color(0, 90, 0));
      pixels.setPixelColor(5, pixels.Color(100, 90, 0));
      pixels.setPixelColor(6, pixels.Color(100, 90, 0));
      pixels.setPixelColor(7, pixels.Color(95, 0, 0));
    }
    pixels.show();   // Send the updated pixel colors to the hardware.
    Serial.flush();  // Clear Serial.
  } else {

  }
}

void setup() {
  // Initialize initial values.
  Serial.begin(9600);
  pixels.begin(); // INITIALIZE NeoPixel strip object
  pixels.setBrightness(64);
  Serial.setTimeout(33.3);//30hz
}

void loop() {
  //Handle USB communication
  // USBMIDI.poll();
  buffer_fill();
  delay(1);
}
 

ItzDragon

New Member
hey boi, i was trying to do as you and i found something on this : https://github.com/upgradeQ/OBS-Stu...Examples-of-API#access-source-db-volume-level

it contains this code : https://github.com/upgradeQ/OBS-Stu...es-of-API/blob/master/src/volmeter_via_ffi.py

i was getting errors but aftor some modification i got an output which is here printed
here is the modificated code

Code:
import obspython as S  # studio
from types import SimpleNamespace
from ctypes import *
from ctypes.util import find_library

obsffi = CDLL(find_library("obs"))
G = SimpleNamespace()


def wrap(funcname, restype, argtypes):
    """Simplify wrapping ctypes functions in obsffi"""
    func = getattr(obsffi, funcname)
    func.restype = restype
    func.argtypes = argtypes
    globals()["g_" + funcname] = func


class Source(Structure):
    pass


class Volmeter(Structure):
    pass


volmeter_callback_t = CFUNCTYPE(
    None, c_void_p, POINTER(c_float), POINTER(c_float), POINTER(c_float)
)
wrap("obs_get_source_by_name", POINTER(Source), argtypes=[c_char_p])
wrap("obs_source_release", None, argtypes=[POINTER(Source)])
wrap("obs_volmeter_create", POINTER(Volmeter), argtypes=[c_int])
wrap("obs_volmeter_destroy", None, argtypes=[POINTER(Volmeter)])
wrap(
    "obs_volmeter_add_callback",
    None,
    argtypes=[POINTER(Volmeter), volmeter_callback_t, c_void_p],
)
wrap(
    "obs_volmeter_remove_callback",
    None,
    argtypes=[POINTER(Volmeter), volmeter_callback_t, c_void_p],
)
wrap(
    "obs_volmeter_attach_source",
    c_bool,
    argtypes=[POINTER(Volmeter), POINTER(Source)],
)


@volmeter_callback_t
def volmeter_callback(data, mag, peak, input):
    G.noise = float(peak[0])


def output_to_file(volume):
    print(volume)
    # with open("current_db_volume_of_source_status.txt", "w", encoding="utf-8") as f:
    #     f.write(str(volume))


OBS_FADER_LOG = 2
G.lock = False
G.start_delay = 3
G.duration = 0
G.noise = 999
G.tick = 16
G.tick_mili = G.tick * 0.001
G.interval_sec = 0.05
G.tick_acc = 0
G.source_name = "Mic"
G.volmeter = "not yet initialized volmeter instance"
G.callback = output_to_file


def event_loop():
    """wait n seconds, then execute callback with db volume level within interval"""
    if G.duration > G.start_delay:
        if not G.lock:
            print("setting volmeter")
            source = g_obs_get_source_by_name(G.source_name.encode("utf-8"))
            G.volmeter = g_obs_volmeter_create(OBS_FADER_LOG)
            g_obs_volmeter_add_callback(G.volmeter, volmeter_callback, None)
            if g_obs_volmeter_attach_source(G.volmeter, source):
                g_obs_source_release(source)
                G.lock = True
                print("Attached to source")
                return
        G.tick_acc += G.tick_mili
        if G.tick_acc > G.interval_sec:
            G.callback(G.noise)
            G.tick_acc = 0
    else:
        G.duration += G.tick_mili


def script_unload():
    g_obs_volmeter_remove_callback(G.volmeter, volmeter_callback, None)
    g_obs_volmeter_destroy(G.volmeter)
    print("Removed volmeter & volmeter_callback")


S.timer_add(event_loop, G.tick)

enjoi
 

josephsmendoza

New Member
There is all manner of code here that I do not understand. Is there a version of this script that can be used as a library? for example:
Python:
import obsaudio
sources=obsaudio.getSources()
def printAudio(level):
    print(level)
obsaudio.registerCallback(sources[0],printAudio)
 
Top