Python開發之路(6)— 綜郃心知天氣、語言喚醒和百度API做一個語音天氣助手
#!/usr/bin/env pythonimportcollections
importpyaudio
importsnowboydetect
importtime
importwave
importos
importlogging
fromctypesimport*fromcontextlibimportcontextmanager
logging.basicConfig()
logger=logging.getLogger('snowboy')
logger.setLevel(logging.INFO)
TOP_DIR=os.path.dirname(os.path.abspath(__file__))
RESOURCE_FILE=os.path.join(TOP_DIR,'resources/common.res')
DETECT_DING=os.path.join(TOP_DIR,'resources/ding.wav')
DETECT_DONG=os.path.join(TOP_DIR,'resources/dong.wav')defpy_error_handler(filename,line,function,err,fmt):pass
ERROR_HANDLER_FUNC=CFUNCTYPE(None,c_char_p,c_int,c_char_p,c_int,c_char_p)
c_error_handler=ERROR_HANDLER_FUNC(py_error_handler)
@contextmanager
defno_alsa_error():try:
asound =cdll.LoadLibrary('libasound.so')
asound.snd_lib_error_set_handler(c_error_handler)yield
asound.snd_lib_error_set_handler(None)except:yieldpassclassRingBuffer(object):'''Ring buffer to hold audio from PortAudio'''def__init__(self,size=4096):
self._buf=collections.deque(maxlen=size)defextend(self,data):'''Adds data to the end of buffer'''
self._buf.extend(data)defget(self):'''Retrieves data from the beginning of buffer and clears it'''
tmp =bytes(bytearray(self._buf))
self._buf.clear()returntmp
defplay_audio_file(fname=DETECT_DONG):'''Simple callback function to play a wave file. By default it plays
a Ding sound.
:param str fname: wave file name
:return: None
'''ding_wav=wave.open(fname,'rb')
ding_data =ding_wav.readframes(ding_wav.getnframes())withno_alsa_error():
audio =pyaudio.PyAudio()
stream_out =audio.open(format=audio.get_format_from_width(ding_wav.getsampwidth()),
channels=ding_wav.getnchannels(),
rate=ding_wav.getframerate(),input=False,output=True)
stream_out.start_stream()
stream_out.write(ding_data)
time.sleep(0.2)
stream_out.stop_stream()
stream_out.close()
audio.terminate()classHotwordDetector(object):'''
Snowboy decoder to detect whether a keyword specified by `decoder_model`
exists in a microphone input stream.
:param decoder_model: decoder model file path, a string or a list of strings
:param resource: resource file path.
:param sensitivity: decoder sensitivity, a float of a list of floats.
The bigger the value, the more senstive the
decoder. If an empty list is provided, then the
default sensitivity in the model will be used.
:param audio_gain: multiply input volume by this factor.
:param apply_frontend: applies the frontend processing algorithm if True.
'''def__init__(self,decoder_model,
resource=RESOURCE_FILE,sensitivity=[],
audio_gain=1,
apply_frontend=False):
tm =type(decoder_model)
ts =type(sensitivity)iftmisnotlist:
decoder_model =[decoder_model]iftsisnotlist:
sensitivity =[sensitivity]
model_str =','.join(decoder_model)
self.detector=snowboydetect.SnowboyDetect(
resource_filename=resource.encode(),model_str=model_str.encode())
self.detector.SetAudioGain(audio_gain)
self.detector.ApplyFrontend(apply_frontend)
self.num_hotwords=self.detector.NumHotwords()iflen(decoder_model)>1andlen(sensitivity)==1:
sensitivity =sensitivity*self.num_hotwords
iflen(sensitivity)!=0:assertself.num_hotwords==len(sensitivity),'number of hotwords in decoder_model (%d) and sensitivity ''(%d) does not match'%(self.num_hotwords,len(sensitivity))
sensitivity_str =','.join([str(t)fortinsensitivity])iflen(sensitivity)!=0:
self.detector.SetSensitivity(sensitivity_str.encode())
self.ring_buffer=RingBuffer(
self.detector.NumChannels()*self.detector.SampleRate()*5)# 1、新建一個PyAudio對象 withno_alsa_error():
self.audio=pyaudio.PyAudio()defstart(self,detected_callback=play_audio_file,
interrupt_check=lambda:False,
sleep_time=0.03,
audio_recorder_callback=None,
silent_count_threshold=15,
recording_timeout=100):'''
Start the voice detector. For every `sleep_time` second it checks the
audio buffer for triggering keywords. If detected, then call
corresponding function in `detected_callback`, which can be a single
function (single model) or a list of callback functions (multiple
models). Every loop it also calls `interrupt_check` -- if it returns
True, then breaks from the loop and return.
:param detected_callback: a function or list of functions. The number of
items must match the number of models in
`decoder_model`.
:param interrupt_check: a function that returns True if the main loop
needs to stop.
:param float sleep_time: how much time in second every loop waits.
:param audio_recorder_callback: if specified, this will be called after
a keyword has been spoken and after the
phrase immediately after the keyword has
been recorded. The function will be
passed the name of the file where the
phrase was recorded.
:param silent_count_threshold: indicates how long silence must be heard
to mark the end of a phrase that is
being recorded.
:param recording_timeout: limits the maximum length of a recording.
:return: None
'''
self._running=Truedefaudio_callback(in_data,frame_count,time_info,status):
self.ring_buffer.extend(in_data)
play_data =chr(0)*len(in_data)returnplay_data,pyaudio.paContinue
# 2、打開聲卡
self.stream_in=self.audio.open(input=True,output=True,format=self.audio.get_format_from_width(
self.detector.BitsPerSample()/8),
channels=self.detector.NumChannels(),
rate=self.detector.SampleRate(),
frames_per_buffer=2048,
stream_callback=audio_callback)# 3、打開聲卡後,打印一行提示信息print('I am Listening......')# print('**************channels:',self.detector.NumChannels())# print('**************format:',self.audio.get_format_from_width(self.detector.BitsPerSample() / 8))# print('**************rate:',self.detector.SampleRate())ifinterrupt_check():
logger.debug('detect voice return')return
tc =type(detected_callback)iftcisnotlist:
detected_callback =[detected_callback]iflen(detected_callback)==1andself.num_hotwords>1:
detected_callback *=self.num_hotwords
assertself.num_hotwords==len(detected_callback),'Error: hotwords in your models (%d) do not match the number of ''callbacks (%d)'%(self.num_hotwords,len(detected_callback))
logger.debug('detecting...')
state ='PASSIVE'whileself._runningisTrue:ifinterrupt_check():
logger.debug('detect voice break')break
data =self.ring_buffer.get()iflen(data)==0:
time.sleep(sleep_time)continue
status =self.detector.RunDetection(data)ifstatus==-1:
logger.warning('Error initializing streams or reading audio data')#small state machine to handle recording of phrase after keywordifstate=='PASSIVE':ifstatus>0:#key word found
self.recordedData=[]
self.recordedData.append(data)
silentCount =0
recordingCount =0
message ='Keyword 'str(status)' detected at time: '
message =time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(time.time()))
logger.info(message)
callback =detected_callback[status-1]ifcallbackisnotNone:
callback()ifaudio_recorder_callbackisnotNone:
state ='ACTIVE'continueelifstate=='ACTIVE':
stopRecording =FalseifrecordingCount>recording_timeout:
stopRecording =Trueelifstatus==-2:#silence foundifsilentCount>silent_count_threshold:
stopRecording =Trueelse:
silentCount =silentCount1elifstatus==0:#voice found
silentCount =0ifstopRecording==True:
fname =self.saveMessage()
audio_recorder_callback(fname)
state ='PASSIVE'continue
recordingCount =recordingCount1
self.recordedData.append(data)
logger.debug('finished.')defsaveMessage(self):'''
Save the message stored in self.recordedData to a timestamped file.
'''
filename ='output'str(int(time.time()))'.wav'
data =b''.join(self.recordedData)#use wave to save data
wf =wave.open(filename,'wb')
wf.setnchannels(1)
wf.setsampwidth(self.audio.get_sample_size(
self.audio.get_format_from_width(
self.detector.BitsPerSample()/8)))
wf.setframerate(self.detector.SampleRate())
wf.writeframes(data)
wf.close()
logger.debug('finished saving: 'filename)returnfilename
defterminate(self):'''
Terminate audio stream. Users can call start() again to detect.
:return: None
'''
self.stream_in.stop_stream()
self.stream_in.close()# self.audio.terminate()
self._running=False
0條評論