ESPHome  2025.2.0
audio_pipeline.cpp
Go to the documentation of this file.
1 #include "audio_pipeline.h"
2 
3 #ifdef USE_ESP_IDF
4 
5 #include "esphome/core/defines.h"
6 #include "esphome/core/hal.h"
7 #include "esphome/core/helpers.h"
8 #include "esphome/core/log.h"
9 
10 namespace esphome {
11 namespace speaker {
12 
13 static const uint32_t INITIAL_BUFFER_MS = 1000; // Start playback after buffering this duration of the file
14 
15 static const uint32_t READ_TASK_STACK_SIZE = 5 * 1024;
16 static const uint32_t DECODE_TASK_STACK_SIZE = 3 * 1024;
17 
18 static const uint32_t INFO_ERROR_QUEUE_COUNT = 5;
19 
20 static const char *const TAG = "speaker_media_player.pipeline";
21 
22 enum EventGroupBits : uint32_t {
23  // MESSAGE_* bits are only set by their respective tasks
24 
25  // Stops all activity in the pipeline elements; cleared by process_state() and set by stop() or by each task
27 
28  // Read audio from an HTTP source; cleared by reader task and set by start_url
30  // Read audio from an audio file from the flash; cleared by reader task and set by start_file
32 
33  // Audio file type is read after checking it is supported; cleared by decoder task
35  // Reader is done (either through a failure or just end of the stream); cleared by reader task
37  // Error reading the file; cleared by process_state()
39 
40  // Decoder is done (either through a faiilure or the end of the stream); cleared by decoder task
42  // Error decoding the file; cleared by process_state() by decoder task
43  DECODER_MESSAGE_ERROR = (1 << 13),
44 };
45 
46 AudioPipeline::AudioPipeline(speaker::Speaker *speaker, size_t buffer_size, bool task_stack_in_psram,
47  std::string base_name, UBaseType_t priority)
48  : base_name_(std::move(base_name)),
49  priority_(priority),
50  task_stack_in_psram_(task_stack_in_psram),
51  speaker_(speaker),
52  buffer_size_(buffer_size) {
54  this->transfer_buffer_size_ = std::min(buffer_size_ / 4, DEFAULT_TRANSFER_BUFFER_SIZE);
55 }
56 
57 void AudioPipeline::start_url(const std::string &uri) {
58  if (this->is_playing_) {
59  xEventGroupSetBits(this->event_group_, PIPELINE_COMMAND_STOP);
60  }
61  this->current_uri_ = uri;
62  this->pending_url_ = true;
63 }
64 
66  if (this->is_playing_) {
67  xEventGroupSetBits(this->event_group_, PIPELINE_COMMAND_STOP);
68  }
69  this->current_audio_file_ = audio_file;
70  this->pending_file_ = true;
71 }
72 
73 esp_err_t AudioPipeline::stop() {
74  xEventGroupSetBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
75 
76  return ESP_OK;
77 }
78 void AudioPipeline::set_pause_state(bool pause_state) {
79  this->speaker_->set_pause_state(pause_state);
80 
81  this->pause_state_ = pause_state;
82 }
83 
85  if (this->read_task_handle_ != nullptr) {
86  vTaskSuspend(this->read_task_handle_);
87  }
88  if (this->decode_task_handle_ != nullptr) {
89  vTaskSuspend(this->decode_task_handle_);
90  }
91 }
92 
94  if (this->read_task_handle_ != nullptr) {
95  vTaskResume(this->read_task_handle_);
96  }
97  if (this->decode_task_handle_ != nullptr) {
98  vTaskResume(this->decode_task_handle_);
99  }
100 }
101 
103  /*
104  * Log items from info error queue
105  */
106  InfoErrorEvent event;
107  if (this->info_error_queue_ != nullptr) {
108  while (xQueueReceive(this->info_error_queue_, &event, 0)) {
109  switch (event.source) {
111  if (event.err.has_value()) {
112  ESP_LOGE(TAG, "Media reader encountered an error: %s", esp_err_to_name(event.err.value()));
113  } else if (event.file_type.has_value()) {
114  ESP_LOGD(TAG, "Reading %s file type", audio_file_type_to_string(event.file_type.value()));
115  }
116 
117  break;
119  if (event.err.has_value()) {
120  ESP_LOGE(TAG, "Decoder encountered an error: %s", esp_err_to_name(event.err.value()));
121  }
122 
123  if (event.audio_stream_info.has_value()) {
124  ESP_LOGD(TAG, "Decoded audio has %d channels, %" PRId32 " Hz sample rate, and %d bits per sample",
125  event.audio_stream_info.value().get_channels(), event.audio_stream_info.value().get_sample_rate(),
126  event.audio_stream_info.value().get_bits_per_sample());
127  }
128 
129  if (event.decoding_err.has_value()) {
130  switch (event.decoding_err.value()) {
132  ESP_LOGE(TAG, "Failed to parse the file's header.");
133  break;
135  ESP_LOGE(TAG, "Incompatible bits per sample. Only 16 bits per sample is supported");
136  break;
138  ESP_LOGE(TAG, "Incompatible number of channels. Only 1 or 2 channel audio is supported.");
139  break;
140  }
141  }
142  break;
143  }
144  }
145  }
146 
147  /*
148  * Determine the current state based on the event group bits and tasks' status
149  */
150 
151  EventBits_t event_bits = xEventGroupGetBits(this->event_group_);
152 
153  if (this->pending_url_ || this->pending_file_) {
154  // Init command pending
155  if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
156  // Only start if there is no pending stop command
157  if ((this->read_task_handle_ == nullptr) || (this->decode_task_handle_ == nullptr)) {
158  // At least one task isn't running
159  this->start_tasks_();
160  }
161 
162  if (this->pending_url_) {
163  xEventGroupSetBits(this->event_group_, EventGroupBits::READER_COMMAND_INIT_HTTP);
164  this->playback_ms_ = 0;
165  this->pending_url_ = false;
166  } else if (this->pending_file_) {
167  xEventGroupSetBits(this->event_group_, EventGroupBits::READER_COMMAND_INIT_FILE);
168  this->playback_ms_ = 0;
169  this->pending_file_ = false;
170  }
171 
172  this->is_playing_ = true;
174  }
175  }
176 
177  if ((event_bits & EventGroupBits::READER_MESSAGE_FINISHED) &&
180  // Tasks are finished and there's no media in between the reader and decoder
181 
182  if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
183  // Stop command is fully processed, so clear the command bit
184  xEventGroupClearBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
185  this->hard_stop_ = true;
186  }
187 
188  if (!this->is_playing_) {
189  // The tasks have been stopped for two ``process_state`` calls in a row, so delete the tasks
190  if ((this->read_task_handle_ != nullptr) || (this->decode_task_handle_ != nullptr)) {
191  this->delete_tasks_();
192  if (this->hard_stop_) {
193  // Stop command was sent, so immediately end of the playback
194  this->speaker_->stop();
195  this->hard_stop_ = false;
196  } else {
197  // Decoded all the audio, so let the speaker finish playing before stopping
198  this->speaker_->finish();
199  }
200  }
201  }
202  this->is_playing_ = false;
204  }
205 
206  if ((event_bits & EventGroupBits::READER_MESSAGE_ERROR)) {
207  xEventGroupClearBits(this->event_group_, EventGroupBits::READER_MESSAGE_ERROR);
209  }
210 
211  if ((event_bits & EventGroupBits::DECODER_MESSAGE_ERROR)) {
212  xEventGroupClearBits(this->event_group_, EventGroupBits::DECODER_MESSAGE_ERROR);
214  }
215 
216  if (this->pause_state_) {
218  }
219 
220  if ((this->read_task_handle_ == nullptr) && (this->decode_task_handle_ == nullptr)) {
221  // No tasks are running, so the pipeline is stopped.
222  xEventGroupClearBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
224  }
225 
226  this->is_playing_ = true;
228 }
229 
231  if (this->event_group_ == nullptr)
232  this->event_group_ = xEventGroupCreate();
233 
234  if (this->event_group_ == nullptr) {
235  return ESP_ERR_NO_MEM;
236  }
237 
238  if (this->info_error_queue_ == nullptr)
239  this->info_error_queue_ = xQueueCreate(INFO_ERROR_QUEUE_COUNT, sizeof(InfoErrorEvent));
240 
241  if (this->info_error_queue_ == nullptr)
242  return ESP_ERR_NO_MEM;
243 
244  return ESP_OK;
245 }
246 
248  if (this->read_task_handle_ == nullptr) {
249  if (this->read_task_stack_buffer_ == nullptr) {
250  if (this->task_stack_in_psram_) {
252  this->read_task_stack_buffer_ = stack_allocator.allocate(READ_TASK_STACK_SIZE);
253  } else {
255  this->read_task_stack_buffer_ = stack_allocator.allocate(READ_TASK_STACK_SIZE);
256  }
257  }
258 
259  if (this->read_task_stack_buffer_ == nullptr) {
260  return ESP_ERR_NO_MEM;
261  }
262 
263  if (this->read_task_handle_ == nullptr) {
264  this->read_task_handle_ =
265  xTaskCreateStatic(read_task, (this->base_name_ + "_read").c_str(), READ_TASK_STACK_SIZE, (void *) this,
266  this->priority_, this->read_task_stack_buffer_, &this->read_task_stack_);
267  }
268 
269  if (this->read_task_handle_ == nullptr) {
270  return ESP_ERR_INVALID_STATE;
271  }
272  }
273 
274  if (this->decode_task_handle_ == nullptr) {
275  if (this->decode_task_stack_buffer_ == nullptr) {
276  if (this->task_stack_in_psram_) {
278  this->decode_task_stack_buffer_ = stack_allocator.allocate(DECODE_TASK_STACK_SIZE);
279  } else {
281  this->decode_task_stack_buffer_ = stack_allocator.allocate(DECODE_TASK_STACK_SIZE);
282  }
283  }
284 
285  if (this->decode_task_stack_buffer_ == nullptr) {
286  return ESP_ERR_NO_MEM;
287  }
288 
289  if (this->decode_task_handle_ == nullptr) {
290  this->decode_task_handle_ =
291  xTaskCreateStatic(decode_task, (this->base_name_ + "_decode").c_str(), DECODE_TASK_STACK_SIZE, (void *) this,
293  }
294 
295  if (this->decode_task_handle_ == nullptr) {
296  return ESP_ERR_INVALID_STATE;
297  }
298  }
299 
300  return ESP_OK;
301 }
302 
304  if (this->read_task_handle_ != nullptr) {
305  vTaskDelete(this->read_task_handle_);
306 
307  if (this->read_task_stack_buffer_ != nullptr) {
308  if (this->task_stack_in_psram_) {
310  stack_allocator.deallocate(this->read_task_stack_buffer_, READ_TASK_STACK_SIZE);
311  } else {
313  stack_allocator.deallocate(this->read_task_stack_buffer_, READ_TASK_STACK_SIZE);
314  }
315 
316  this->read_task_stack_buffer_ = nullptr;
317  this->read_task_handle_ = nullptr;
318  }
319  }
320 
321  if (this->decode_task_handle_ != nullptr) {
322  vTaskDelete(this->decode_task_handle_);
323 
324  if (this->decode_task_stack_buffer_ != nullptr) {
325  if (this->task_stack_in_psram_) {
327  stack_allocator.deallocate(this->decode_task_stack_buffer_, DECODE_TASK_STACK_SIZE);
328  } else {
330  stack_allocator.deallocate(this->decode_task_stack_buffer_, DECODE_TASK_STACK_SIZE);
331  }
332 
333  this->decode_task_stack_buffer_ = nullptr;
334  this->decode_task_handle_ = nullptr;
335  }
336  }
337 }
338 
339 void AudioPipeline::read_task(void *params) {
340  AudioPipeline *this_pipeline = (AudioPipeline *) params;
341 
342  while (true) {
343  xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_FINISHED);
344 
345  // Wait until the pipeline notifies us the source of the media file
346  EventBits_t event_bits =
347  xEventGroupWaitBits(this_pipeline->event_group_,
349  EventGroupBits::PIPELINE_COMMAND_STOP, // Bit message to read
350  pdFALSE, // Clear the bit on exit
351  pdFALSE, // Wait for all the bits,
352  portMAX_DELAY); // Block indefinitely until bit is set
353 
354  if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
355  xEventGroupClearBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_FINISHED |
358  InfoErrorEvent event;
360  esp_err_t err = ESP_OK;
361 
362  std::unique_ptr<audio::AudioReader> reader =
363  make_unique<audio::AudioReader>(this_pipeline->transfer_buffer_size_);
364 
365  if (event_bits & EventGroupBits::READER_COMMAND_INIT_FILE) {
366  err = reader->start(this_pipeline->current_audio_file_, this_pipeline->current_audio_file_type_);
367  } else {
368  err = reader->start(this_pipeline->current_uri_, this_pipeline->current_audio_file_type_);
369  }
370 
371  if (err == ESP_OK) {
372  size_t file_ring_buffer_size = this_pipeline->buffer_size_;
373 
374  std::shared_ptr<RingBuffer> temp_ring_buffer;
375 
376  if (!this_pipeline->raw_file_ring_buffer_.use_count()) {
377  temp_ring_buffer = RingBuffer::create(file_ring_buffer_size);
378  this_pipeline->raw_file_ring_buffer_ = temp_ring_buffer;
379  }
380 
381  if (!this_pipeline->raw_file_ring_buffer_.use_count()) {
382  err = ESP_ERR_NO_MEM;
383  } else {
384  reader->add_sink(this_pipeline->raw_file_ring_buffer_);
385  }
386  }
387 
388  if (err != ESP_OK) {
389  // Send specific error message
390  event.err = err;
391  xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
392 
393  // Setting up the reader failed, stop the pipeline
394  xEventGroupSetBits(this_pipeline->event_group_,
395  EventGroupBits::READER_MESSAGE_ERROR | EventGroupBits::PIPELINE_COMMAND_STOP);
396  } else {
397  // Send the file type to the pipeline
398  event.file_type = this_pipeline->current_audio_file_type_;
399  xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
400  xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_LOADED_MEDIA_TYPE);
401  }
402 
403  while (true) {
404  event_bits = xEventGroupGetBits(this_pipeline->event_group_);
405 
406  if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
407  break;
408  }
409 
410  audio::AudioReaderState reader_state = reader->read();
411 
412  if (reader_state == audio::AudioReaderState::FINISHED) {
413  break;
414  } else if (reader_state == audio::AudioReaderState::FAILED) {
415  xEventGroupSetBits(this_pipeline->event_group_,
416  EventGroupBits::READER_MESSAGE_ERROR | EventGroupBits::PIPELINE_COMMAND_STOP);
417  break;
418  }
419  }
420  event_bits = xEventGroupGetBits(this_pipeline->event_group_);
422  (this_pipeline->raw_file_ring_buffer_.use_count() == 1)) {
423  // Decoder task hasn't started yet, so delay a bit before releasing ownership of the ring buffer
424  delay(10);
425  }
426  }
427  }
428 }
429 
430 void AudioPipeline::decode_task(void *params) {
431  AudioPipeline *this_pipeline = (AudioPipeline *) params;
432 
433  while (true) {
434  xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::DECODER_MESSAGE_FINISHED);
435 
436  // Wait until the reader notifies us that the media type is available
437  EventBits_t event_bits = xEventGroupWaitBits(this_pipeline->event_group_,
439  EventGroupBits::PIPELINE_COMMAND_STOP, // Bit message to read
440  pdFALSE, // Clear the bit on exit
441  pdFALSE, // Wait for all the bits,
442  portMAX_DELAY); // Block indefinitely until bit is set
443 
444  if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
445  xEventGroupClearBits(this_pipeline->event_group_,
447  InfoErrorEvent event;
449 
450  std::unique_ptr<audio::AudioDecoder> decoder =
451  make_unique<audio::AudioDecoder>(this_pipeline->transfer_buffer_size_, this_pipeline->transfer_buffer_size_);
452 
453  esp_err_t err = decoder->start(this_pipeline->current_audio_file_type_);
454  decoder->add_source(this_pipeline->raw_file_ring_buffer_);
455 
456  if (err != ESP_OK) {
457  // Send specific error message
458  event.err = err;
459  xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
460 
461  // Setting up the decoder failed, stop the pipeline
462  xEventGroupSetBits(this_pipeline->event_group_,
463  EventGroupBits::DECODER_MESSAGE_ERROR | EventGroupBits::PIPELINE_COMMAND_STOP);
464  }
465 
466  bool has_stream_info = false;
467  bool started_playback = false;
468 
469  size_t initial_bytes_to_buffer = 0;
470 
471  while (true) {
472  event_bits = xEventGroupGetBits(this_pipeline->event_group_);
473 
474  if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
475  break;
476  }
477 
478  // Update pause state
479  if (!started_playback) {
480  if (!(event_bits & EventGroupBits::READER_MESSAGE_FINISHED)) {
481  decoder->set_pause_output_state(true);
482  } else {
483  started_playback = true;
484  }
485  } else {
486  decoder->set_pause_output_state(this_pipeline->pause_state_);
487  }
488 
489  // Stop gracefully if the reader has finished
490  audio::AudioDecoderState decoder_state = decoder->decode(event_bits & EventGroupBits::READER_MESSAGE_FINISHED);
491 
492  if ((decoder_state == audio::AudioDecoderState::DECODING) ||
493  (decoder_state == audio::AudioDecoderState::FINISHED)) {
494  this_pipeline->playback_ms_ = decoder->get_playback_ms();
495  }
496 
497  if (decoder_state == audio::AudioDecoderState::FINISHED) {
498  break;
499  } else if (decoder_state == audio::AudioDecoderState::FAILED) {
500  if (!has_stream_info) {
501  event.decoding_err = DecodingError::FAILED_HEADER;
502  xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
503  }
504  xEventGroupSetBits(this_pipeline->event_group_,
505  EventGroupBits::DECODER_MESSAGE_ERROR | EventGroupBits::PIPELINE_COMMAND_STOP);
506  break;
507  }
508 
509  if (!has_stream_info && decoder->get_audio_stream_info().has_value()) {
510  has_stream_info = true;
511 
512  this_pipeline->current_audio_stream_info_ = decoder->get_audio_stream_info().value();
513 
514  // Send the stream information to the pipeline
515  event.audio_stream_info = this_pipeline->current_audio_stream_info_;
516 
517  if (this_pipeline->current_audio_stream_info_.get_bits_per_sample() != 16) {
518  // Error state, incompatible bits per sample
519  event.decoding_err = DecodingError::INCOMPATIBLE_BITS_PER_SAMPLE;
520  xEventGroupSetBits(this_pipeline->event_group_,
521  EventGroupBits::DECODER_MESSAGE_ERROR | EventGroupBits::PIPELINE_COMMAND_STOP);
522  } else if ((this_pipeline->current_audio_stream_info_.get_channels() > 2)) {
523  // Error state, incompatible number of channels
524  event.decoding_err = DecodingError::INCOMPATIBLE_CHANNELS;
525  xEventGroupSetBits(this_pipeline->event_group_,
526  EventGroupBits::DECODER_MESSAGE_ERROR | EventGroupBits::PIPELINE_COMMAND_STOP);
527  } else {
528  // Send audio directly to the speaker
529  this_pipeline->speaker_->set_audio_stream_info(this_pipeline->current_audio_stream_info_);
530  decoder->add_sink(this_pipeline->speaker_);
531  }
532 
533  initial_bytes_to_buffer = std::min(this_pipeline->current_audio_stream_info_.ms_to_bytes(INITIAL_BUFFER_MS),
534  this_pipeline->buffer_size_ * 3 / 4);
535 
536  switch (this_pipeline->current_audio_file_type_) {
537 #ifdef USE_AUDIO_MP3_SUPPORT
539  initial_bytes_to_buffer /= 8; // Estimate the MP3 compression factor is 8
540  break;
541 #endif
542 #ifdef USE_AUDIO_FLAC_SUPPORT
544  initial_bytes_to_buffer /= 2; // Estimate the FLAC compression factor is 2
545  break;
546 #endif
547  default:
548  break;
549  }
550  xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
551  }
552 
553  if (!started_playback && has_stream_info) {
554  // Verify enough data is available before starting playback
555  std::shared_ptr<RingBuffer> temp_ring_buffer = this_pipeline->raw_file_ring_buffer_.lock();
556  if (temp_ring_buffer->available() >= initial_bytes_to_buffer) {
557  started_playback = true;
558  }
559  }
560  }
561  }
562  }
563 }
564 
565 } // namespace speaker
566 } // namespace esphome
567 
568 #endif
value_type const & value() const
Definition: optional.h:89
static void decode_task(void *params)
esp_err_t start_tasks_()
Common start code for the pipeline, regardless if the source is a file or url.
optional< DecodingError > decoding_err
uint8_t get_channels() const
Definition: audio.h:29
void set_pause_state(bool pause_state)
optional< audio::AudioStreamInfo > audio_stream_info
void start_file(audio::AudioFile *audio_file)
Starts an audio pipeline given a AudioFile pointer.
uint8_t get_bits_per_sample() const
Definition: audio.h:28
AudioPipelineState process_state()
Processes the state of the audio pipeline based on the info_error_queue_ and event_group_.
STL namespace.
T * allocate(size_t n)
Definition: helpers.h:703
audio::AudioFile * current_audio_file_
void suspend_tasks()
Suspends any running tasks.
virtual void finish()
Definition: speaker.h:58
bool has_value() const
Definition: optional.h:87
audio::AudioStreamInfo current_audio_stream_info_
audio::AudioFileType current_audio_file_type_
void set_audio_stream_info(const audio::AudioStreamInfo &audio_stream_info)
Definition: speaker.h:99
std::weak_ptr< RingBuffer > raw_file_ring_buffer_
void resume_tasks()
Resumes any running tasks.
optional< audio::AudioFileType > file_type
size_t ms_to_bytes(uint32_t ms) const
Converts duration to bytes.
Definition: audio.h:73
void deallocate(T *p, size_t n)
Definition: helpers.h:720
uint8_t priority
void delete_tasks_()
Resets the task related pointers and deallocates their stacks.
esp_err_t allocate_communications_()
Allocates the event group and info error queue.
Implementation of SPI Controller mode.
Definition: a01nyub.cpp:7
virtual void set_pause_state(bool pause_state)
Definition: speaker.h:61
void start_url(const std::string &uri)
Starts an audio pipeline given a media url.
esp_err_t stop()
Stops the pipeline.
static void read_task(void *params)
An STL allocator that uses SPI or internal RAM.
Definition: helpers.h:683
static std::unique_ptr< RingBuffer > create(size_t len)
Definition: ring_buffer.cpp:22
virtual void stop()=0
const char * audio_file_type_to_string(AudioFileType file_type)
Helper function to convert file type to a const char string.
Definition: audio.cpp:40
void IRAM_ATTR HOT delay(uint32_t ms)
Definition: core.cpp:26
AudioPipeline(speaker::Speaker *speaker, size_t buffer_size, bool task_stack_in_psram, std::string base_name, UBaseType_t priority)