Before trying to understand this document or avplumber's source code, read all the README (except node descriptions).
Source codes of all nodes reside in src/nodes/
directory. Each .cpp
file corresponds to one or more nodes.
There are also subdirectories, included conditionally depending on flags set in Makefile
. For example, if your node is useful only for debugging purposes (e.g. it is intentionally corrupting packets to test recovery code), you should put it in debug/
directory.
Directories are scanned using a shell script (generate_node_list
) and a find
command inside Makefile
, when make
is run. There is no index file in sources (it is generated automatically when compiling), so you don't need to update anything after adding .cpp
file to src/nodes/
directory or subdirectory.
It begins with:
#include "node_common.hpp"
which includes commonly used headers and DECLNODE
macros (see below).
Then classes or class templates of nodes follow.
After class or class template is defined, DECLNODE
macros need to be used to enable node creation and include it in the autogenerated index.
They serve 2 functions:
generate_node_list
scrips searches for them in all nodes sources and generates the nodes index- they are parsed by C++ preprocessor (as every macro in C/C++ source code), generating code needed for node creation
Available macros:
DECLNODE(nodetype, classname)
- declares a node which will be created whentype
parameter in node's JSON object equalsnodetype
. The node must be defined inclassname
class.DECLNODE_ATD(nodetype, tplname)
- declares a node which will be created whentype
parameter in node's JSON object equalsnodetype
. The node template must be defined intplname
class template. It will be specialized for single template argument being the type of data processed in this node. If the node has both input(s) and output(s), input and output types must be the same.ATD
means automatic type detection.DECLNODE_ALIAS(nodetype, classname)
- declares a different type name for a node already declared usingDECLNODE
DECLNODE_ATD_ALIAS(nodetype, tplname)
- declares a different type name for a node already declared usingDECLNODE_ATD
Arguments for blocking nodes:
- good for blocking operations on the file system & network
- good for computationally expensive operations: video & audio encoding, decoding and processing
Arguments for non-blocking nodes:
- less overhead (no context switch when passing data between nodes)
- more predictible timing
avplumber supported only blocking nodes for a very long time, thus even nodes that would benefit from being non-blocking are still implemented as blocking.
Every node class needs to be derived from Node
. If the node is blocking, the only virtual function that absolutely needs to be overriden is process
. Example: firewall
node which doesn't allow packets or frames without PTS to pass:
virtual void process() {
T data = this->source_->get();
if (!data.pts().isValid()) return;
this->sink_->put(data);
}
This method will be executed in node thread's main loop - NodeWrapper::threadFunction
(src/graph_mgmt.cpp
)
If the node is non-blocking, it should be derived from NonBlockingNode
and the function to override is processNonBlocking
. See src/nodes/realtime.cpp
for an example.
If tick_source
is not specified, processNonBlocking
is called once with argument ticks = false
when starting node and it is node's responsibility to add events to the event loop. NonBlockingNode
defines some convenience functions (wrappers for EventLoop
's methods) that will call processNonBlocking
again when certain event happens:
processWhenSignalled(Event &event)
- when event is signalled, for example queue has data available or space availablesleepAndProcess(int ms)
- afterms
millisecondsscheduleProcess(av::Timestamp when)
- atwhen
timestamp from wallclock. Usewallclock.ts()
to get current timestamp.yieldAndProcess()
- putsprocessNonBlocking
at the end of the event loop.
If tick_source
is specified, processNonBlocking
is called with argument ticks = true
on each tick. You can still use the functions mentioned above if you want to run the processNonBlocking
sooner than the next tick, but usually it shouldn't be needed.
Example of using signals:
virtual void processNonBlocking(EventLoop& evl, bool ticks) {
T* dataptr = this->source_->peek(0);
if (dataptr==nullptr) {
// no data available in queue
if (!ticks) {
// retry when we have packet in source queue
this->processWhenSignalled(this->edgeSource()->edge()->producedEvent());
}
// if ticks==true, processNonBlocking will be called automatically with next tick
// no need to schedule it
return;
}
T &data = *dataptr;
// ...
// process the data
// ...
// put it in the sink queue:
if (this->sink_->put(data, true)) {
// put returned true, success, remove this packet from the source queue
this->source_->pop();
if (!ticks) {
// process next packet
this->yieldAndProcess();
}
} else {
// put returned false, no space in queue
if (!ticks) {
// retry when we have space in sink
// note that the whole processNonBlocking will be run again
// so it is not a good idea to do it if the processing is stateful (e.g. encoding or decoding)
this->processWhenSignalled(this->edgeSink()->edge()->consumedEvent());
}
}
}
If your processing is stateful, you can:
- write your node as a regular blocking node, or
- use the underlying function
evl.asyncWaitAndExecute
directly. Pass the packet to put in the queue in lambda's captured variables, as a value (not a reference). Do not capturethis
orthis->shared_from_this()
because it can lead to use-after-free memory corruption or a memory leak. Capture a weak pointer (std::weak_ptr
) instead.
Note that the 2 options mentioned above create a hidden queue of size 1 on thread's stack or in the closure. The remaining options are:
- check for space in the output queue before doing processing
- undo data modifications when
put
call fails (that's what we do innodes/speed.cpp
)
Note that in the above example, if tick_source
is used (ticks==true
), only one packet or frame will be processed per tick. If the input FPS is higher than FPS of the tick source, data may accumulate in the queue. Wrap then whole processing in a loop to fix this shortcoming (TODO: wouldn't calling yieldAndProcess without !ticks
condition suffice?):
virtual void processNonBlocking(EventLoop& evl, bool ticks) {
do {
T* dataptr = this->source_->peek(0);
if (dataptr==nullptr) {
if (!ticks) {
this->processWhenSignalled(this->edgeSource()->edge()->producedEvent());
}
return; // this will exit the loop if no more input is available right now
}
T &data = *dataptr;
// ...
// process the data
// ...
if (this->sink_->put(data, true)) {
this->source_->pop();
if (!ticks) {
this->yieldAndProcess();
}
} else {
if (!ticks) {
this->processWhenSignalled(this->edgeSink()->edge()->consumedEvent());
}
return; // we don't have space in sink queue so, if ticks==true, retry in next tick
}
} while (ticks); // loop only if have tick_source, otherwise yieldAndProcess will handle processing the next input data
}
Static method create
needs to be defined in the class. It parses the data from node definition (JSON object) and creates the node object. NodeSISO
has a helper static method createCommon
. Example of its usage:
static std::shared_ptr<Firewall> create(NodeCreationInfo &nci) {
EdgeManager &edges = nci.edges;
const Parameters ¶ms = nci.params;
return NodeSISO<T, T>::template createCommon<Firewall>(edges, params);
}
If your constructor requires additional parameters, they can be passed after edges
and params
in createCommon
.
By the way, if you look at the source code of createCommon
or other methods from the file src/graph_base.hpp
, and abstract source/sink classes in src/graph_core.hpp
, you'll notice that we're creating source and sink objects wrapping the edges. The idea was to have multiple possible implementation of sources and sinks, not only edges (queues). In practice it was never used and some nodes use edges directly, so it should be refactored - simplified, to reduce unnecessary boilerplate code. Pull requests welcome! (if you don't have time for coding but have an idea of possible architecture, that's welcome, too)
Commonly used node source & sink patterns are available as base classes or templates derived from Node:
NodeSingleInput<InputType>
- use
this->source_->get()
(blocking function) to get packet/frame - or
this->source_->peek(0)
(0 to make it non-blocking) andthis->source_->pop()
- use
NodeSingleOutput<OutputType>
- use
this->sink_->put(packet_or_frame)
to output the packet/frame to the next node in the graph
- use
NodeSISO<InputType, OutputType>
(Single Input, Single Output) - combinesNodeSingleInput
andNodeSingleOutput
NodeMultiInput<InputType>
- use
int i = findSourceWithData()
to get index of source edge that has packet/frame waiting to be read. If returned value isn't -1, read if usingthis->source_edges_[i]->peek()
and callthis->source_edges_[i]->pop()
when you've done using it (i.e. you've passed it along to the next node, or it is to be discarded)
- use
NodeMultiOutputs<OutputType>
- use
this->sink_edges_[output_index]->enqueue(packet_or_frame)
(blocking function) to output packet/frame - or
this->sink_edges_[output_index]->try_enqueue(packet_or_frame)
for non-blocking operation
- use
These bases define their own constructors, you should call them in your constructor or write using BaseTemplate<TemplateArgument>::BaseTemplate;
to explicitly use base constructor in derived class.
You should generally use one or more of these bases, since they override virtual methods and implement interfaces important for internal avplumber operation. If you want to implement them on your own (because you need, for example, mixed data types on input or output), look at src/nodes/filters.cpp
and src/graph_base.hpp
for inspiration.
Thanks to multiple inheritance, you can use most (all?) of these bases in non-blocking nodes, too. See src/nodes/realtime.cpp
for example.
Interfaces are defined in src/graph_interfaces.hpp
. They're used:
- for node management (
src/graph_mgmt.cpp
): main loop (NodeWrapper::threadFunction
) flow depends on what interfaces the node implements;IInterruptible
can be used to stop the node bypassing any locks (node.interrupt
command) - for graph traversal, usually when a node needs to know something about stream metadata.
EdgeBase
(insrc/graph_core.hpp
) andNodeSingleInput
(insrc/graph_base.hpp
) containfindNodeUp
method, which finds the nearest node, up in the graph, implementing a specific interface. Graph topology required by existing nodes is summarized in the README. - for statistics extraction. Signal presence information is taken from
ISentinel
. Video & audio parameters are taken fromIDecoder
.
As summarized in the README, instance-shared objects can be used for storing state shared between nodes or even between instances.
If your shared object is specific to a node and doesn't need separate commands for controlling it, you can specify it directly within node's .cpp
source file (example: src/nodes/sentinel.cpp
).
If, on the other hand, it is to be used by multiple nodes and/or you want some commands to be able to create or control it, specify it in a separate .hpp
file (example: src/RealTimeTeam.hpp
)
Instance-shared struct or class is, by convention, derived from InstanceShared<typename>
(CRTP) from src/instance_shared.hpp
. However, currently the base class isn't used for anything so it is not necessary (but may make documentation a little more organized once someone brave enough decides to run Doxygen on avplumber sources ;) )
To find the instance-shared object with a given name, use the method from src/instance_shared.hpp
:
InstanceSharedObjects<ObjectType>::get(nci.instance, name)
It will return the shared pointer (std::shared_ptr<ObjectType>
) to the object. If its fields are changed, all nodes will see the change (because it is a regular shared_ptr
). So make sure to use atomics and/or mutexes when simultaneous access is likely.
ObjectType
is the typename of your shared object's struct or classnci.instance
is theInstanceData
reference belonging to current avplumber instance. You can obtain it fromnci
which is node creation information given as an argument to thecreate
static function of the node. Or fromNodeManager::instanceData()
fromsrc/graph_mgmt.hpp
. In case of defining a command insidesrc/avplumber.cpp
, usemanager_->instanceData()
.name
is the unique name of this specific object, used as a key in the hash map.
This assumes that either:
- the instance-shared object has no-arguments constructor. It will be created if it doesn't exist yet.
- or the instance-shared object has already been created by a different static method:
put
oremplace
In simpler cases where instance-shared objects can be created implicitly, you don't need to think about putting the objects into the hash map. Just write an argumentless constructor or define desired default values for all fields, and the get
static method will do the magic.
However, if you need to create the node with some argument, you have the following options:
using ISOs = InstanceSharedObjects<ObjectType>;
std::shared_ptr<ObjectType> shared_ptr_to_object = std::make_shared<ObjectType>(...arguments for constructor...)
ISOs::put(nci.instance, name, shared_ptr_to_object, policy_if_exists)
or
using ISOs = InstanceSharedObjects<ObjectType>;
ISOs::emplace(nci.instance, name, policy_if_exists, ...arguments for constructor...)
policy_if_exists
can be:
ISOs::PolicyIfExists::Overwrite
- overwrite existing objectISOs::PolicyIfExists::Ignore
- do not insert new objectISOs::PolicyIfExists::Throw
- throw an exception
Probably Ignore
is the most useful - it allows to lazily initialize the object when its name appears for the first time and then re-use it. Be aware that if you use put
with Ignore
policy, the object given to it will not end up in the hash table so you need to re-read the object using get
. It is better to use emplace
- it doesn't waste CPU cycles creating an object that will be soon discarded.
In some cases you want the instance-shared object to be able to 'call back' the nodes. However there is a risk of 2 types of circular references: compile time, when a node wants to access the shared object, which in turn wants to access the node; and runtime, when shared_ptr
loop is created and causes a memory leak. Possible ways of solving it:
- Store the callback (closure) inside the instanced-shared object. If it needs to access the node's data, the closure must contain
std::weak_ptr<NodeType>
, notshared_ptr
or a raw pointer or reference. Otherwise the node object will never be destroyed or use-after-free memory corruption will occur. From the shared object, simply call the callback. - Declare an interface in
src/graph_interfaces.hpp
. Implement it in your node. Storestd::weak_ptr<Interface>
in the shared object. From the shared object, call methods of the node through the interface. This way, you don't need to split the node's source into separate.cpp
and.hpp
files.