Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix RTMP publish single AAC from ffmpeg client. #110

Merged
merged 1 commit into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions library/bytesio/src/bytesio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,25 @@ pub struct UdpIO {

impl UdpIO {
pub async fn new(remote_domain: String, remote_port: u16, local_port: u16) -> Option<Self> {
let remote_address = format!("{remote_domain}:{remote_port}");
let remote_address = if remote_domain == "localhost" {
format!("127.0.0.1:{remote_port}")
} else {
format!("{remote_domain}:{remote_port}")
};
log::info!("remote address: {}", remote_address);
let local_address = format!("0.0.0.0:{local_port}");
if let Ok(local_socket) = UdpSocket::bind(local_address).await {
if let Ok(remote_socket_addr) = remote_address.parse::<SocketAddr>() {
if let Err(err) = local_socket.connect(remote_socket_addr).await {
log::info!("connect to remote udp socket error: {}", err);
}

return Some(Self {
socket: local_socket,
});
} else {
log::error!("remote_address parse error: {:?}", remote_address);
}
return Some(Self {
socket: local_socket,
});
}

None
Expand Down Expand Up @@ -75,7 +82,7 @@ impl TNetIO for UdpIO {
Ok(data) => data,
Err(err) => Err(BytesIOError {
value: BytesIOErrorValue::TimeoutError(err),
})
}),
}
}

Expand Down Expand Up @@ -120,7 +127,7 @@ impl TNetIO for TcpIO {
Ok(data) => data,
Err(err) => Err(BytesIOError {
value: BytesIOErrorValue::TimeoutError(err),
})
}),
}
}

Expand Down
5 changes: 4 additions & 1 deletion library/container/flv/src/demuxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ impl FlvAudioTagDemuxer {
if tag_header.sound_format == SoundFormat::AAC as u8 {
match tag_header.aac_packet_type {
aac_packet_type::AAC_SEQHDR => {
self.aac_processor.audio_specific_config_load()?;
if self.aac_processor.bytes_reader.len() >= 2 {
self.aac_processor.audio_specific_config_load()?;
}

return Ok(FlvDemuxerAudioData::new());
}
aac_packet_type::AAC_RAW => {
Expand Down
42 changes: 39 additions & 3 deletions library/container/flv/src/muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,38 @@ use {
bytesio::bytes_writer::BytesWriter,
};

const FLV_HEADER: [u8; 9] = [
const FLV_HEADER_AV: [u8; 9] = [
0x46, // 'F'
0x4c, //'L'
0x56, //'V'
0x01, //version
0x05, //00000101 audio tag and video tag
0x00, 0x00, 0x00, 0x09, //flv header size
]; // 9
const FLV_HEADER_JUST_AUDIO: [u8; 9] = [
0x46, // 'F'
0x4c, //'L'
0x56, //'V'
0x01, //version
0x04, //00000101 audio tag and video tag
0x00, 0x00, 0x00, 0x09, //flv header size
]; // 9
const FLV_HEADER_JUST_VIDEO: [u8; 9] = [
0x46, // 'F'
0x4c, //'L'
0x56, //'V'
0x01, //version
0x01, //00000101 audio tag and video tag
0x00, 0x00, 0x00, 0x09, //flv header size
]; // 9
const FLV_HEADER_EMPTY_AV: [u8; 9] = [
0x46, // 'F'
0x4c, //'L'
0x56, //'V'
0x01, //version
0x00, //00000101 audio tag and video tag
0x00, 0x00, 0x00, 0x09, //flv header size
]; // 9
pub const HEADER_LENGTH: u32 = 11;
pub struct FlvMuxer {
pub writer: BytesWriter,
Expand All @@ -29,8 +53,20 @@ impl FlvMuxer {
}
}

pub fn write_flv_header(&mut self) -> Result<(), FlvMuxerError> {
self.writer.write(&FLV_HEADER)?;
pub fn write_flv_header(
&mut self,
has_audio: bool,
has_video: bool,
) -> Result<(), FlvMuxerError> {
if has_audio && has_video {
self.writer.write(&FLV_HEADER_AV)?;
} else if has_audio {
self.writer.write(&FLV_HEADER_JUST_AUDIO)?;
} else if has_video {
self.writer.write(&FLV_HEADER_JUST_VIDEO)?;
} else {
self.writer.write(&FLV_HEADER_EMPTY_AV)?;
}
Ok(())
}

Expand Down
4 changes: 4 additions & 0 deletions protocol/hls/src/flv2hls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ impl Flv2HlsRemuxer {
dts = data.dts;
pid = self.audio_pid;
payload.extend_from_slice(&data.data[..]);

if dts - self.last_ts_dts >= self.duration * 1000 {
self.need_new_segment = true;
}
}
_ => return Ok(()),
}
Expand Down
57 changes: 53 additions & 4 deletions protocol/httpflv/src/httpflv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ pub struct HttpFlv {

muxer: FlvMuxer,

has_audio: bool,
has_video: bool,
has_send_header: bool,

event_producer: StreamHubEventSender,
data_receiver: FrameDataReceiver,
/* now used for subscriber session */
Expand All @@ -52,6 +56,9 @@ impl HttpFlv {
app_name,
stream_name,
muxer: FlvMuxer::new(),
has_audio: false,
has_video: false,
has_send_header: false,
data_receiver,
statistic_data_sender: None,
event_producer,
Expand All @@ -70,14 +77,56 @@ impl HttpFlv {
}

pub async fn send_media_stream(&mut self) -> Result<(), HttpFLvError> {
self.muxer.write_flv_header()?;
self.muxer.write_previous_tag_size(0)?;

self.flush_response_data()?;
let mut retry_count = 0;

let mut max_av_frame_num_to_guess_av = 0;
// the first av frames are sequence configs, must be cached;
let mut cached_frames = Vec::new();
//write flv body
loop {
if let Some(data) = self.data_receiver.recv().await {
if !self.has_send_header {
max_av_frame_num_to_guess_av += 1;

match data {
FrameData::Audio {
timestamp: _,
data: _,
} => {
self.has_audio = true;
cached_frames.push(data);
}
FrameData::Video {
timestamp: _,
data: _,
} => {
self.has_video = true;
cached_frames.push(data);
}
FrameData::MetaData {
timestamp: _,
data: _,
} => cached_frames.push(data),
_ => {}
}

if (self.has_audio && self.has_video) || max_av_frame_num_to_guess_av > 10 {
self.has_send_header = true;
self.muxer
.write_flv_header(self.has_audio, self.has_video)?;
self.muxer.write_previous_tag_size(0)?;

self.flush_response_data()?;

for frame in &cached_frames {
self.write_flv_tag(frame.clone())?;
}
cached_frames.clear();
}

continue;
}

if let Err(err) = self.write_flv_tag(data) {
if let HttpFLvErrorValue::MpscSendError(err_in) = &err.value {
if err_in.is_disconnected() {
Expand Down
8 changes: 5 additions & 3 deletions protocol/rtmp/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub struct Cache {

impl Cache {
pub fn new(gop_num: usize, statistic_data_sender: Option<StatisticDataSender>) -> Self {

Cache {
metadata: metadata::MetaData::new(),
metadata_timestamp: 0,
Expand Down Expand Up @@ -78,7 +77,10 @@ impl Cache {
let mut reader = BytesReader::new(chunk_body.clone());
let tag_header = AudioTagHeader::unmarshal(&mut reader)?;

if tag_header.sound_format == define::SoundFormat::AAC as u8
let remain_bytes = reader.extract_remaining_bytes();

if remain_bytes.len() >= 2
&& tag_header.sound_format == define::SoundFormat::AAC as u8
&& tag_header.aac_packet_type == define::aac_packet_type::AAC_SEQHDR
{
self.audio_seq = chunk_body.clone();
Expand All @@ -88,7 +90,7 @@ impl Cache {
let mut aac_processor = Mpeg4AacProcessor::default();

let aac = aac_processor
.extend_data(reader.extract_remaining_bytes())
.extend_data(remain_bytes)
.audio_specific_config_load()?;

let statistic_audio_codec = StatisticData::AudioCodec {
Expand Down
Loading