A small and light tool to help with FreeBSD Ports CI (Continuous Integration)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

main.go 8.5KB


  1. package main
  2. import (
  3. "bytes"
  4. "crypto/hmac"
  5. "crypto/sha1"
  6. "crypto/tls"
  7. "encoding/json"
  8. "flag"
  9. "fmt"
  10. "io/ioutil"
  11. "log"
  12. "net/http"
  13. "os"
  14. "os/exec"
  15. "path"
  16. "path/filepath"
  17. "regexp"
  18. "strings"
  19. "sync"
  20. "time"
  21. "gopkg.in/yaml.v2"
  22. )
  23. type controller struct {
  24. wg *sync.WaitGroup
  25. cfg *config
  26. }
  27. type queue struct {
  28. Name string
  29. Recipe string
  30. Environment map[string]string
  31. queue chan worker
  32. }
  33. type config struct {
  34. Workdir string
  35. Logdir string
  36. Server struct {
  37. Host string
  38. BaseURL string
  39. TLScert string
  40. TLSkey string
  41. }
  42. Webhook struct {
  43. Secret string
  44. }
  45. Repository struct {
  46. APIURL string
  47. APIToken string
  48. }
  49. Queues []queue
  50. DefaultQueues []string `yaml:"default_queues"`
  51. }
  52. type worker struct {
  53. ID string
  54. Status string
  55. Queue queue
  56. Port string
  57. Commit string
  58. RepoURL string
  59. RepoName string
  60. RepoFullName string
  61. }
  62. type gitPushEventData struct {
  63. Secret string `json:"secret"`
  64. CommitID string `json:"after"`
  65. Repository struct {
  66. Name string `json:"name"`
  67. FullName string `json:"full_name"`
  68. URL string `json:"clone_url"`
  69. } `json:"repository"`
  70. Commits []struct {
  71. Message string `json:"message"`
  72. } `json:"commits"`
  73. }
  74. func calcSignature(payload *[]byte, secret string) string {
  75. mac := hmac.New(sha1.New, []byte(secret))
  76. mac.Write(*payload)
  77. return fmt.Sprintf("sha1=%x", mac.Sum(nil))
  78. }
  79. func newWorkerID() string {
  80. return time.Now().Format("20060102150405.000000")
  81. }
  82. func getPortFromMessage(msg string) string {
  83. lines := strings.Split(msg, "\n")
  84. if len(lines) < 1 || strings.IndexByte(lines[0], ':') < 1 {
  85. return ""
  86. }
  87. re := regexp.MustCompile(`^([a-z0-9-]+)/([a-zA-Z0-9-_.]+)$`)
  88. port := strings.TrimSpace(lines[0][:strings.IndexByte(lines[0], ':')])
  89. if re.MatchString(port) {
  90. return port
  91. }
  92. return ""
  93. }
  94. func (c *controller) getQueueInfoFromMessage(msg string) []queue {
  95. queues := make([]queue, 0)
  96. lines := strings.Split(msg, "\n")
  97. for _, line := range lines {
  98. line = strings.ToLower(line)
  99. if strings.HasPrefix(line, "ci:") {
  100. if strings.Contains(line, "no") || strings.Contains(line, "false") {
  101. return queues
  102. }
  103. if strings.Contains(line, "yes") || strings.Contains(line, "true") {
  104. for i := range c.cfg.Queues {
  105. queues = append(queues, c.cfg.Queues[i])
  106. }
  107. return queues
  108. }
  109. }
  110. }
  111. for _, name := range c.cfg.DefaultQueues {
  112. q := c.getQueueByName(name)
  113. if q.Name == "" {
  114. continue
  115. }
  116. queues = append(queues, q)
  117. }
  118. return queues
  119. }
  120. func (c *controller) getQueueByName(name string) queue {
  121. for i := range c.cfg.Queues {
  122. if c.cfg.Queues[i].Name == name {
  123. return c.cfg.Queues[i]
  124. }
  125. }
  126. return queue{}
  127. }
  128. func (c *controller) sendStatusUpdate(wrk worker) error {
  129. target := ""
  130. if wrk.Status != "pending" {
  131. target = fmt.Sprintf("%s/logs/%s.txt", c.cfg.Server.BaseURL, wrk.ID)
  132. }
  133. url := fmt.Sprintf("%s/repos/%s/statuses/%s?access_token=%s",
  134. c.cfg.Repository.APIURL, wrk.RepoFullName, wrk.Commit, c.cfg.Repository.APIToken)
  135. jsonValue, _ := json.Marshal(map[string]string{
  136. "state": wrk.Status,
  137. "target_url": target,
  138. "context": wrk.Queue.Name,
  139. })
  140. _, err := http.Post(url, "application/json", bytes.NewBuffer(jsonValue))
  141. return err
  142. }
  143. func (c *controller) startWorker(workChan chan worker) {
  144. defer c.wg.Done()
  145. for {
  146. select {
  147. case wrk := <-workChan:
  148. queue := c.getQueueByName(wrk.Queue.Name)
  149. log.Printf("ID %s started on %s\n", wrk.ID, queue.Name)
  150. c.sendStatusUpdate(wrk)
  151. env := append(os.Environ(),
  152. fmt.Sprintf("JOB_ID=%s", wrk.ID),
  153. fmt.Sprintf("COMMIT_ID=%s", wrk.Commit),
  154. fmt.Sprintf("REPO_URL=%s", wrk.RepoURL),
  155. fmt.Sprintf("JOB_PORT=%s", wrk.Port),
  156. )
  157. for k, v := range queue.Environment {
  158. env = append(env, fmt.Sprintf("%s=%s", k, v))
  159. }
  160. workdir := strings.Replace(queue.Name, "/", "", -1)
  161. workdir = strings.Replace(workdir, " ", "", -1)
  162. workdir = path.Join(c.cfg.Workdir, workdir)
  163. os.MkdirAll(workdir, os.ModePerm)
  164. cmd := exec.Cmd{
  165. Dir: workdir,
  166. Env: env,
  167. Path: "/usr/bin/make",
  168. Args: []string{
  169. "make",
  170. "-C", workdir,
  171. "-f", fmt.Sprintf("%s.mk", queue.Recipe),
  172. "-I", c.cfg.Workdir,
  173. "all",
  174. },
  175. }
  176. output, err := cmd.CombinedOutput()
  177. if err != nil {
  178. wrk.Status = "failure"
  179. } else {
  180. wrk.Status = "success"
  181. }
  182. ioutil.WriteFile(path.Join(c.cfg.Logdir, wrk.ID+".txt"), output, 0600)
  183. log.Printf("ID %s finished %s\n", wrk.ID, wrk.Status)
  184. c.sendStatusUpdate(wrk)
  185. case <-time.After(time.Second * 1):
  186. }
  187. }
  188. }
  189. func (c *controller) handleWebhook(w http.ResponseWriter, r *http.Request) {
  190. payload, err := ioutil.ReadAll(r.Body)
  191. if err != nil {
  192. http.Error(w, "Internal Error", http.StatusInternalServerError)
  193. return
  194. }
  195. if r.Header.Get("X-GitHub-Event") != "" {
  196. if r.Header.Get("X-GitHub-Event") != "push" {
  197. http.Error(w, "Invalid webhook", http.StatusBadRequest)
  198. return
  199. }
  200. }
  201. data := gitPushEventData{}
  202. if err = json.Unmarshal(payload, &data); err != nil {
  203. http.Error(w, "Failed to parse webhook data", http.StatusBadRequest)
  204. return
  205. }
  206. if c.cfg.Webhook.Secret != "" {
  207. if r.Header.Get("X-Hub-Signature") != "" {
  208. if calcSignature(&payload, c.cfg.Webhook.Secret) != r.Header.Get("X-Hub-Signature") {
  209. http.Error(w, "Invalid secret", http.StatusBadRequest)
  210. return
  211. }
  212. } else {
  213. if data.Secret != c.cfg.Webhook.Secret {
  214. http.Error(w, "Invalid secret", http.StatusBadRequest)
  215. return
  216. }
  217. }
  218. }
  219. port := getPortFromMessage(data.Commits[0].Message)
  220. if port == "" {
  221. fmt.Fprint(w, "No category/port detected in commit message")
  222. return
  223. }
  224. cnt := 0
  225. for _, q := range c.getQueueInfoFromMessage(data.Commits[0].Message) {
  226. job := worker{
  227. ID: newWorkerID(),
  228. Status: "pending",
  229. Queue: q,
  230. Port: port,
  231. Commit: data.CommitID,
  232. RepoURL: data.Repository.URL,
  233. RepoName: data.Repository.Name,
  234. RepoFullName: data.Repository.FullName,
  235. }
  236. select {
  237. case q.queue <- job:
  238. cnt++
  239. log.Printf("%s Port %s queued on %s (pos %d)\n", job.ID, job.Port, q.Name, len(q.queue))
  240. default:
  241. log.Printf("%s Queue limit reached on queue %s\n", job.ID, q.Name)
  242. }
  243. }
  244. fmt.Fprintf(w, "%d Jobs queued", cnt)
  245. }
  246. func (c *controller) startHTTPD() {
  247. defer c.wg.Done()
  248. mux := http.NewServeMux()
  249. fs := http.FileServer(http.Dir("logs"))
  250. mux.Handle("/logs/", http.StripPrefix("/logs/", fs))
  251. mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
  252. if r.Method == "GET" {
  253. fmt.Fprint(w, "nothing to see here")
  254. } else {
  255. c.handleWebhook(w, r)
  256. }
  257. })
  258. var err error
  259. if c.cfg.Server.TLScert != "" && c.cfg.Server.TLSkey != "" {
  260. cfg := &tls.Config{
  261. MinVersion: tls.VersionTLS12,
  262. CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256},
  263. PreferServerCipherSuites: true,
  264. CipherSuites: []uint16{
  265. tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
  266. tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
  267. tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
  268. tls.TLS_RSA_WITH_AES_256_CBC_SHA,
  269. },
  270. }
  271. srv := &http.Server{
  272. Addr: c.cfg.Server.Host,
  273. Handler: mux,
  274. TLSConfig: cfg,
  275. TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler), 0),
  276. }
  277. log.Printf("Listening on %s (https)\n", c.cfg.Server.Host)
  278. err = srv.ListenAndServeTLS(c.cfg.Server.TLScert, c.cfg.Server.TLSkey)
  279. } else {
  280. srv := &http.Server{
  281. Addr: c.cfg.Server.Host,
  282. Handler: mux,
  283. }
  284. log.Printf("Listening on %s (http)\n", c.cfg.Server.Host)
  285. err = srv.ListenAndServe()
  286. }
  287. if err != nil {
  288. log.Printf("Listen failed: %s\n", err)
  289. }
  290. }
  291. func parseConfig(file string) config {
  292. f, err := os.Open(file)
  293. if err != nil {
  294. log.Fatalf("Error: %v", err)
  295. }
  296. defer f.Close()
  297. dec := yaml.NewDecoder(f)
  298. cfg := config{}
  299. err = dec.Decode(&cfg)
  300. if err != nil {
  301. log.Fatalf("Error: %v", err)
  302. }
  303. cfg.Workdir, _ = filepath.Abs(cfg.Workdir)
  304. cfg.Logdir, _ = filepath.Abs(cfg.Logdir)
  305. cfg.Server.BaseURL = strings.TrimSuffix(cfg.Server.BaseURL, "/")
  306. cfg.Repository.APIURL = strings.TrimSuffix(cfg.Repository.APIURL, "/")
  307. return cfg
  308. }
  309. func main() {
  310. var cfgfile string
  311. flag.StringVar(&cfgfile, "config", "caronade.yaml", "Path to config file")
  312. flag.Parse()
  313. cfg := parseConfig(cfgfile)
  314. wg := sync.WaitGroup{}
  315. for i := range cfg.Queues {
  316. log.Printf("Adding queue %s\n", cfg.Queues[i].Name)
  317. cfg.Queues[i].queue = make(chan worker, 10)
  318. }
  319. ctrl := controller{
  320. wg: &wg,
  321. cfg: &cfg,
  322. }
  323. for i := range cfg.Queues {
  324. wg.Add(1)
  325. go ctrl.startWorker(cfg.Queues[i].queue)
  326. }
  327. wg.Add(1)
  328. go ctrl.startHTTPD()
  329. wg.Wait()
  330. }