This repository has been archived on 2019-06-23. You can view files and clone it, but cannot push or open issues or pull requests.
mumbledj/vendor/github.com/layeh/gumble/gumbleffmpeg/stream.go
2016-06-20 17:50:40 -07:00

204 lines
4.1 KiB
Go

package gumbleffmpeg
import (
"encoding/binary"
"errors"
"io"
"os/exec"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/layeh/gumble/gumble"
)
// State represents the state of a Stream.
type State int32
// Valid states of Stream.
const (
StateInitial State = iota + 1
StatePlaying
StatePaused
StateStopped
)
// Stream is an audio stream that encodes media through ffmpeg and sends it to
// the server.
//
// A stream can only be used once; it cannot be started after it is stopped.
type Stream struct {
// Command to execute to play the file. Defaults to "ffmpeg".
Command string
// Playback volume (can be changed while the source is playing).
Volume float32
// Audio source (cannot be changed after stream starts).
Source Source
// Starting offset.
Offset time.Duration
client *gumble.Client
cmd *exec.Cmd
pipe io.ReadCloser
pause chan struct{}
elapsed int64
state State
l sync.Mutex
wg sync.WaitGroup
}
// New returns a new Stream for the given gumble Client and Source.
func New(client *gumble.Client, source Source) *Stream {
return &Stream{
client: client,
Volume: 1.0,
Source: source,
Command: "ffmpeg",
pause: make(chan struct{}),
state: StateInitial,
}
}
// Play begins playing
func (s *Stream) Play() error {
s.l.Lock()
defer s.l.Unlock()
switch s.state {
case StatePaused:
s.state = StatePlaying
go s.process()
return nil
case StatePlaying:
return errors.New("gumbleffmpeg: stream already playing")
case StateStopped:
return errors.New("gumbleffmpeg: stream has stopped")
}
// fresh stream
if s.Source == nil {
return errors.New("gumbleffmpeg: nil source")
}
args := s.Source.arguments()
if s.Offset > 0 {
args = append([]string{"-ss", strconv.FormatFloat(s.Offset.Seconds(), 'f', -1, 64)}, args...)
}
args = append(args, "-ac", strconv.Itoa(gumble.AudioChannels), "-ar", strconv.Itoa(gumble.AudioSampleRate), "-f", "s16le", "-")
cmd := exec.Command(s.Command, args...)
var err error
s.pipe, err = cmd.StdoutPipe()
if err != nil {
return err
}
if err := s.Source.start(cmd); err != nil {
return err
}
if err := cmd.Start(); err != nil {
s.Source.done()
return err
}
s.wg.Add(1)
s.cmd = cmd
s.state = StatePlaying
go s.process()
return nil
}
// State returns the state of the stream.
func (s *Stream) State() State {
s.l.Lock()
defer s.l.Unlock()
return s.state
}
// Pause pauses a playing stream.
func (s *Stream) Pause() error {
s.l.Lock()
if s.state != StatePlaying {
s.l.Unlock()
return errors.New("gumbleffmpeg: stream is not playing")
}
s.state = StatePaused
s.l.Unlock()
s.pause <- struct{}{}
return nil
}
// Stop stops the stream.
func (s *Stream) Stop() error {
s.l.Lock()
switch s.state {
case StateStopped, StateInitial:
s.l.Unlock()
return errors.New("gumbleffmpeg: stream is not playing nor paused")
}
s.cleanup()
s.Wait()
return nil
}
// Wait returns once the stream has stopped playing.
func (s *Stream) Wait() {
s.wg.Wait()
}
// Elapsed returns the amount of audio that has been played by the stream.
func (s *Stream) Elapsed() time.Duration {
return time.Duration(atomic.LoadInt64(&s.elapsed))
}
func (s *Stream) process() {
// s.state has been set to StatePlaying
interval := s.client.Config.AudioInterval
frameSize := s.client.Config.AudioFrameSize()
byteBuffer := make([]byte, frameSize*2)
outgoing := s.client.AudioOutgoing()
defer close(outgoing)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-s.pause:
return
case <-ticker.C:
if _, err := io.ReadFull(s.pipe, byteBuffer); err != nil {
s.l.Lock()
s.cleanup()
return
}
int16Buffer := make([]int16, frameSize)
for i := range int16Buffer {
float := float32(int16(binary.LittleEndian.Uint16(byteBuffer[i*2 : (i+1)*2])))
int16Buffer[i] = int16(s.Volume * float)
}
atomic.AddInt64(&s.elapsed, int64(interval))
outgoing <- gumble.AudioBuffer(int16Buffer)
}
}
}
func (s *Stream) cleanup() {
defer s.l.Unlock()
// s.l has been acquired
if s.state == StateStopped {
return
}
s.cmd.Process.Kill()
s.cmd.Wait()
s.Source.done()
for len(s.pause) > 0 {
<-s.pause
}
s.state = StateStopped
s.wg.Done()
}