Commit 383da05a authored by Simon Kirsten's avatar Simon Kirsten
Browse files

Commented code, simplified code, reorganized build system and is now using modules

parent d773856a
Pipeline #22231 failed with stages
in 20 seconds
stream-tv-server*
\ No newline at end of file
pages:
stage: deploy
image: python:alpine
image: python:3.7-alpine
before_script:
# TODO: we should consider also version locking these packages:
# + everything will work in the future
# - we wont see any future improvements in mkdocs or mkdocs-material
- pip install mkdocs
- pip install mkdocs-material
script:
......
server:
stage: build
image: golang
image: golang:1.12-alpine
script:
- env GOOS=darwin GOARCH=386 go build -o stream-tv-server-darwin-x86 ./cmd/stream-tv-server
- env GOOS=darwin GOARCH=amd64 go build -o stream-tv-server-darwin-x86_64 ./cmd/stream-tv-server
......@@ -15,6 +14,7 @@ server:
- env GOOS=windows GOARCH=amd64 go build -o stream-tv-server-windows-x86_64.exe ./cmd/stream-tv-server
artifacts:
expire_in: 100 yrs # unfortunately keep_forever or keep_latest is not (yet) available in GitLab.
paths:
- stream-tv-server-darwin-x86
- stream-tv-server-darwin-x86_64
......
# Include the subdirectorie's .gitlab-ci.yml
variables:
GITLAB_DOMAIN: "code.fbi.h-da.de"
# Include the two gitlab-ci files
include:
- '/.gitlab-ci-docs.yml'
......
......@@ -3,6 +3,5 @@
[![pipeline status](https://code.fbi.h-da.de/simons-nzse-2/stream-tv/badges/master/pipeline.svg)](https://code.fbi.h-da.de/simons-nzse-2/stream-tv/commits/master)
[![coverage report](https://code.fbi.h-da.de/simons-nzse-2/stream-tv/badges/master/coverage.svg)](https://code.fbi.h-da.de/simons-nzse-2/stream-tv/commits/master)
### [Stream TV Documentation](https://simons-nzse-2.h-da.io/stream-tv)
### [Documentation](https://simons-nzse-2.h-da.io/stream-tv)
TODO
\ No newline at end of file
package main
import (
"flag"
"fmt"
"log"
"net/http"
"os"
tv "code.fbi.h-da.de/simons-nzse-2/stream-tv/internal/tv"
twitch "code.fbi.h-da.de/simons-nzse-2/stream-tv/internal/twitch"
util "code.fbi.h-da.de/simons-nzse-2/stream-tv/internal/util"
website "code.fbi.h-da.de/simons-nzse-2/stream-tv/internal/website"
"github.com/urfave/cli"
"code.fbi.h-da.de/simons-nzse-2/stream-tv/internal/tv"
"code.fbi.h-da.de/simons-nzse-2/stream-tv/internal/twitch"
"code.fbi.h-da.de/simons-nzse-2/stream-tv/internal/util"
"code.fbi.h-da.de/simons-nzse-2/stream-tv/internal/website"
)
var listenAddr string
var openBrowser bool
const (
docURL = "https://simons-nzse-2.h-da.io/stream-tv/server/"
)
func init() {
const (
defaultListen = ":8080"
usageListen = "Address to listen on"
defaultBrowser = false
usageBrowser = "Automatically open the default browser"
)
func main() {
app := cli.NewApp()
flag.StringVar(&listenAddr, "listen", defaultListen, usageListen)
flag.StringVar(&listenAddr, "l", defaultListen, usageListen+" (shorthand)")
var port int
var browser bool
var local bool
var help bool
flag.BoolVar(&openBrowser, "browser", defaultBrowser, usageBrowser)
flag.BoolVar(&openBrowser, "b", defaultBrowser, usageBrowser+" (shorthand)")
}
app.Usage = "Stream TV Server " + docURL
app.HideHelp = true
func main() {
flag.Parse()
// TODO: we can embed the version either with git tags in the CI pipeline or just hardcode it
// for now just hide
app.HideVersion = true
app.Flags = []cli.Flag{
cli.IntFlag{
Name: "port, p",
Value: 8080,
Usage: "http port to listen on",
Destination: &port,
},
cli.BoolFlag{
Name: "browser, b",
Usage: "automatically open the default browser",
Destination: &browser,
},
cli.BoolFlag{
Name: "local, l",
Usage: "only listen on local interfaces (see doc)",
Destination: &local,
},
cli.BoolFlag{
Name: "help, h",
Usage: "show help",
Destination: &help,
},
}
mux := http.NewServeMux()
app.Action = func(c *cli.Context) error {
if help {
cli.ShowAppHelp(c)
return nil
}
mux.Handle("/", website.Handler())
mux.Handle("/twitch/", twitch.Handler())
mux.Handle("/tv/", tv.Handler())
if len(c.Args()) != 0 {
return fmt.Errorf("Unknown arguments: %v", c.Args())
}
fmt.Printf("Starting Stream TV Server on %s\n\n", listenAddr)
mux := http.NewServeMux()
fmt.Printf("IP addresses:\n")
mux.Handle("/", website.Handler())
mux.Handle("/twitch/", http.StripPrefix("/twitch", twitch.Handler()))
mux.Handle("/tv/", http.StripPrefix("/tv", tv.Handler()))
util.PrintIPAddresses()
var listenAddr string
if local {
listenAddr = fmt.Sprintf("127.0.0.1:%d", port)
} else {
listenAddr = fmt.Sprintf("0.0.0.0:%d", port)
}
fmt.Printf("\nRead the documentation at https://simons-nzse-2.h-da.io/stream-tv to use this server\n")
fmt.Printf("Stop with Ctrl-C or close this terminal\n")
fmt.Printf("Starting Stream TV Server on %s\n\n", listenAddr)
fmt.Printf("Listening on:\n")
listeningAddresses, err := util.GetListeningAddresses()
if err != nil {
// do nothing. range listeningAddresses is okay with nil
}
for _, laddr := range listeningAddresses {
addrStr := fmt.Sprintf("%s:%d", laddr.IP.String(), port)
if local && !laddr.IsLoopback {
continue
}
fmt.Printf(" http://%-21s %s\n", addrStr, laddr.InterfaceName)
}
fmt.Printf("\nRead the documentation at %s on how to use this server\n", docURL)
fmt.Printf("Stop with Ctrl-C or close this terminal\n")
if browser {
err := util.OpenBrowser(fmt.Sprintf("http://127.0.0.1:%d", port))
if err != nil {
fmt.Printf("Could not automatically open browser: %v\n", err)
}
}
return http.ListenAndServe(listenAddr, mux)
}
if openBrowser {
util.OpenBrowser("http://localhost" + listenAddr)
err := app.Run(os.Args)
if err != nil {
log.Fatal(err)
}
log.Fatal(http.ListenAndServe(listenAddr, mux))
}
// Copyright (c) 2017 Ismael Celis
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package tv
import (
"fmt"
// "log"
"net/http"
)
// Example SSE server in Golang.
// $ go run sse.go
type Broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier chan []byte
// New client connections
newClients chan chan []byte
// Closed client connections
closingClients chan chan []byte
// Client connections registry
clients map[chan []byte]bool
}
func NewSSEBroker() (broker *Broker) {
// Instantiate a broker
broker = &Broker{
Notifier: make(chan []byte, 1),
newClients: make(chan chan []byte),
closingClients: make(chan chan []byte),
clients: make(map[chan []byte]bool),
}
// Set it running - listening and broadcasting events
go broker.listen()
return
}
func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
// Make sure that the writer supports flushing.
//
flusher, ok := rw.(http.Flusher)
if !ok {
http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
return
}
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
// rw.Header().Set("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.closingClients <- messageChan
}()
// Listen to connection close and un-register messageChan
notify := rw.(http.CloseNotifier).CloseNotify()
go func() {
<-notify
broker.closingClients <- messageChan
}()
for {
// Write to the ResponseWriter
// Server Sent Events compatible
fmt.Fprintf(rw, "data: %s\n\n", <-messageChan)
// Flush the data immediatly instead of buffering it for later.
flusher.Flush()
}
}
func (broker *Broker) listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
// log.Printf("Client added. %d registered clients", len(broker.clients))
case s := <-broker.closingClients:
// A client has dettached and we want to
// stop sending them messages.
delete(broker.clients, s)
// log.Printf("Removed client. %d registered clients", len(broker.clients))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan, _ := range broker.clients {
clientMessageChan <- event
}
}
}
}
......@@ -6,24 +6,32 @@ import (
"net/http"
"strconv"
util "code.fbi.h-da.de/simons-nzse-2/stream-tv/internal/util"
"github.com/JamesStewy/sse"
"code.fbi.h-da.de/simons-nzse-2/stream-tv/internal/util"
)
// tvState struct defines the state of the TV.
type tvState struct {
LargeChannel *string `json:"large_channel"`
SmallChannel *string `json:"small_channel"`
Volume float32 `json:"volume"`
SmallScale float32 `json:"small_scale"`
ShowChat bool `json:"show_chat"`
}
// state is the actuall current state of the TV.
// It is initialized with default values.
var state = tvState{
LargeChannel: nil,
SmallChannel: nil,
Volume: 0.5,
SmallScale: 0.5,
ShowChat: false,
}
func tvStateHandlerFunc(w http.ResponseWriter, r *http.Request) {
// stateHandleFunc just serves the state as JSON encoded
func stateHandleFunc(w http.ResponseWriter, r *http.Request) {
_, err := util.ServeJSON(w, &state)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
......@@ -32,47 +40,89 @@ func tvStateHandlerFunc(w http.ResponseWriter, r *http.Request) {
}
}
var broker = NewSSEBroker()
func tvUpdateHandlerFunc(w http.ResponseWriter, r *http.Request) {
// clients holds the clients that are connected to the event handler. It is used to broadcast state changes to all SSE (Server-Sent Events) clients.
// Note: the only reason we use a sse.Client => bool map is that we can call *delete* with the client as key. The actual bool value that is stored holds no significance whatsoever.
// This is basically a *set*.
var clients map[*sse.Client]bool = make(map[*sse.Client]bool)
// updateHandleFunc updated the state based on the query string.
// For example
// /tv/update?large_channel=asdf&small_channel=null&volume=&small_scale=0.25&show_chat=true
// will
// - set large_channel to asdf
// - reset small_channel to nil/null
// - do nothing with volume (the parameter can also be omitted)
// - set small_scale to 0.25
// - set show_chat to true
// If the floats (volume and small_scale) or the boolean (show_chat) could not be parsed no changes are made at all and an error gets returned.
func updateHandleFunc(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
// the new state we will replace the current state with IF everything parses correctly.
newState := state
// we save the error messages of the 3 parsing steps here.
var errors []string
if q := query.Get("large_channel"); q != "" {
if q == "null" {
state.LargeChannel = nil
newState.LargeChannel = nil
} else {
state.LargeChannel = &q
newState.LargeChannel = &q
}
}
if q := query.Get("small_channel"); q != "" {
if q == "null" {
state.SmallChannel = nil
newState.SmallChannel = nil
} else {
newState.SmallChannel = &q
}
}
if q := query.Get("show_chat"); q != "" {
newShowChat, err := strconv.ParseBool(q)
if err != nil {
errors = append(errors, err.Error())
} else {
state.SmallChannel = &q
newState.ShowChat = newShowChat
}
}
if q := query.Get("volume"); q != "" {
newVolume, err := strconv.ParseFloat(q, 32)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Println(err)
errors = append(errors, err.Error())
} else {
state.Volume = float32(newVolume)
newState.Volume = float32(newVolume)
}
}
if q := query.Get("small_scale"); q != "" {
newSmallScale, err := strconv.ParseFloat(q, 32)
if err != nil {
errors = append(errors, err.Error())
} else {
newState.SmallScale = float32(newSmallScale)
}
}
// TODO: redo this error output
if len(errors) != 0 { // we had errors
_, err := util.ServeJSONWithStatus(w, errors, http.StatusBadRequest)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Println(err)
} else {
state.SmallScale = float32(newSmallScale)
return
}
log.Printf("Error(s) while parsing update query: %v\n", errors)
return
}
state = newState
body, err := util.ServeJSON(w, state)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
......@@ -80,7 +130,7 @@ func tvUpdateHandlerFunc(w http.ResponseWriter, r *http.Request) {
return
}
log.Printf("State:\n%s\n", string(body))
log.Printf("Updated state (%s):\n%s\n", r.URL.RawQuery, string(body))
response, err := json.Marshal(&state)
......@@ -90,16 +140,45 @@ func tvUpdateHandlerFunc(w http.ResponseWriter, r *http.Request) {
return
}
broker.Notifier <- response
// craft the SSE message
msg := sse.Msg{
Data: string(response),
}
// send it to all clients
for client := range clients {
client.Send(msg)
}
}
// eventsHandleFunc serves a SSE (Server-sent events) endpoint that the website(s) can connect to. It publishes the state when it gets changed.
func eventsHandleFunc(w http.ResponseWriter, r *http.Request) {
client, err := sse.ClientInit(w)
// return error if unable to initialise Server-Sent Events
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// add client to broadcast set
clients[client] = true
// remove client from broadcast set on exit
defer delete(clients, client)
// run the in the context of the request
client.Run(r.Context())
}
// Handler is a http.Handler that handles requests for the /tv backend.
// Handler returns a http.Handler that serves requests for the /tv backend
func Handler() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/tv/state", tvStateHandlerFunc)
mux.HandleFunc("/tv/update", tvUpdateHandlerFunc)
mux.HandleFunc("/tv/events", broker.ServeHTTP)
mux.Handle("/", http.NotFoundHandler()) // if the others don't match, return not found
mux.HandleFunc("/state", stateHandleFunc)
mux.HandleFunc("/update", updateHandleFunc)
mux.HandleFunc("/events", eventsHandleFunc)
return mux
}
......@@ -10,7 +10,7 @@ import (
"net/url"
"strings"
util "code.fbi.h-da.de/simons-nzse-2/stream-tv/internal/util"
"code.fbi.h-da.de/simons-nzse-2/stream-tv/internal/util"
)
// twitchStream is the structure for streams that we get from twitch.
......@@ -75,7 +75,7 @@ type Game struct {
LogoImgURL string `json:"logo_img_url"`
}
// Simplify a twitchStream (that we got from twitch) to a Stream (that we return to the user).
// toSimplified simplifies a twitchStream (that we got from twitch) to a Stream (that we return to the user).
func (s *twitchStream) toSimplified() Stream {
return Stream{
AverageFPS: int(math.Round(s.AverageFPS)),
......@@ -93,7 +93,7 @@ func (s *twitchStream) toSimplified() Stream {
}
}
// Simplify a twitchGame (that we got from twitch) to a Game (that we return to the user).
// toSimplified simplifies a twitchGame (that we got from twitch) to a Game (that we return to the user).
func (g *twitchGame) toSimplified() Game {
return Game{
Name: g.Game.Name,
......@@ -103,6 +103,7 @@ func (g *twitchGame) toSimplified() Game {
}
}
// TODO: comment from here on
func twitchRequest(endpoint string, query url.Values, output interface{}) error {
query.Set("limit", "100")
......@@ -178,7 +179,7 @@ func channelsToID(channels []string) ([]string, error) {
return ids, nil
}
func getFeaturedStreams(w http.ResponseWriter, r *http.Request) {
func streamsFeaturedHandleFunc(w http.ResponseWriter, r *http.Request) {
var twitchFeaturedStreamsResponse struct {
Featured []struct {
......@@ -193,12 +194,11 @@ func getFeaturedStreams(w http.ResponseWriter, r *http.Request) {
return
}
featuredStreamsResponse := make([]Stream, 0)
var featuredStreamsResponse []Stream
for _, featured := range twitchFeaturedStreamsResponse.Featured {
if featured.Stream.Game != "" {
featuredStreamsResponse = append(featuredStreamsResponse,
featured.Stream.toSimplified())
featuredStreamsResponse = append(featuredStreamsResponse, featured.Stream.toSimplified())
}
}
......@@ -210,7 +210,7 @@ func getFeaturedStreams(w http.ResponseWriter, r *http.Request) {
}
}
func getTopStreams(w http.ResponseWriter, r *http.Request) {
func streamsTopHandleFunc(w http.ResponseWriter, r *http.Request) {
var twitchStreamsResponse struct {
Streams []twitchStream `json:"streams"`
......@@ -252,12 +252,11 @@ func getTopStreams(w http.ResponseWriter, r *http.Request) {
return
}
topStreamsResponse := make([]Stream, 0)
var topStreamsResponse []Stream