feat: new deploy queue with live job logs

main
Antonio De Lucreziis 10 months ago
parent 3083dc5f18
commit ad49ce171d

@ -0,0 +1,26 @@
import { useEffect, useState } from "preact/hooks"
export const JobLogs = ({}) => {
const [logLines, setLogLines] = useState([])
useEffect(async () => {
const res = await fetch(location.href + '/logs?format=raw')
const rawLogs = (await res.text()).trim()
if (rawLogs.length > 0) setLogLines(rawLogs.split('\n'))
// Setup SSE
const es = new EventSource(location.href + '/logs?format=sse')
es.addEventListener('message', ({ data }) => {
const event = JSON.parse(data)
setLogLines(lines => [
...lines,
event.content,
])
})
}, [])
return (
<pre><code>{logLines.join('\n')}</code></pre>
)
}

@ -6,8 +6,8 @@ import { durationToString } from './lib/utils'
/** /**
* @param {import('@/jobs.ts').QueuedJob} props * @param {import('@/jobs.ts').QueuedJob} props
*/ */
export const QueuedJob = ({ uuid, name, submitter, submittedAt }) => ( export const QueuedJob = ({ uuid, status, name, submitter, submittedAt }) => (
<div class="job queued" title={uuid}> <div class={'job ' + status} title={uuid} onClick={() => (location.href = `/jobs/${uuid}`)}>
<div class="name">{name}</div> <div class="name">{name}</div>
<div class="footer"> <div class="footer">
<div class="submitted-at">{new Date(submittedAt).toLocaleString()}</div> <div class="submitted-at">{new Date(submittedAt).toLocaleString()}</div>
@ -18,18 +18,9 @@ export const QueuedJob = ({ uuid, name, submitter, submittedAt }) => (
/** /**
* @param {import('@/jobs.ts').CompletedJob} props * @param {import('@/jobs.ts').CompletedJob} props
*/ */
export const CompletedJob = ({ export const CompletedJob = ({ uuid, name, submittedAt, startedAt, completedAt }) => {
uuid,
name,
submitter,
submittedAt,
startedAt,
completedAt,
successful,
error,
}) => {
return ( return (
<div class="job completed" title={uuid}> <div class="job completed" title={uuid} onClick={() => (location.href = `/jobs/${uuid}`)}>
<div class="name">{name}</div> <div class="name">{name}</div>
<div class="footer"> <div class="footer">
<div class="submitted-at">{new Date(submittedAt).toLocaleString()}</div> <div class="submitted-at">{new Date(submittedAt).toLocaleString()}</div>
@ -40,54 +31,33 @@ export const CompletedJob = ({
} }
export const JobsPage = ({}) => { export const JobsPage = ({}) => {
const [jobStore, setJobStore] = useState({ const [jobStore, setJobStore] = useState({})
queuedJobs: {},
completedJobs: {},
})
useEffect(async () => { useEffect(async () => {
const res = await fetch('/api/jobs') // const { searchParams } = new URL(location.href)
const { queuedJobs, completedJobs } = await res.json()
const result = { // const limit = searchParams.has('limit') ? parseInt(searchParams.get('limit')) : 10
queuedJobs: {}, // const offset = (searchParams.has('page') ? parseInt(searchParams.get('page')) : 1) - 1
completedJobs: {},
}
for (const item of queuedJobs) { const res = await fetch(location.href + '?format=json')
result.queuedJobs[item.uuid] = item const jobs = await res.json()
}
for (const item of completedJobs) { const result = {}
result.completedJobs[item.uuid] = item
for (const item of jobs) {
result[item.uuid] = item
} }
setJobStore(result) setJobStore(result)
// Setup SSE // Setup SSE
const es = new EventSource('/api/sse') const es = new EventSource(location.href + '?format=sse')
es.addEventListener('message', ({ data }) => { es.addEventListener('message', ({ data }) => {
const event = JSON.parse(data) const event = JSON.parse(data)
if (event.type === 'added') {
setJobStore(s => ({ setJobStore(s => ({
...s, ...s,
queuedJobs: {
...s.queuedJobs,
[event.job.uuid]: event.job, [event.job.uuid]: event.job,
},
})) }))
}
if (event.type === 'completed') {
setJobStore(s => ({
queuedJobs: {
..._.omit(s.queuedJobs, event.job.uuid),
},
completedJobs: {
...s.completedJobs,
[event.job.uuid]: event.job,
},
}))
}
}) })
}, []) }, [])
@ -95,18 +65,18 @@ export const JobsPage = ({}) => {
<> <>
<h2>Queued Jobs</h2> <h2>Queued Jobs</h2>
<div class="list"> <div class="list">
{Object.values(jobStore.queuedJobs) {Object.values(jobStore)
.toReversed() .filter(job => job.status !== 'completed')
.map(queuedJob => ( .map(job => (
<QueuedJob {...queuedJob} /> <QueuedJob {...job} />
))} ))}
</div> </div>
<h2>Completed Jobs</h2> <h2>Completed Jobs</h2>
<div class="list"> <div class="list">
{Object.values(jobStore.completedJobs) {Object.values(jobStore)
.toReversed() .filter(job => job.status === 'completed')
.map(completedJob => ( .map(job => (
<CompletedJob {...completedJob} /> <CompletedJob {...job} />
))} ))}
</div> </div>
</> </>

@ -1,11 +1,12 @@
import type { Deploy, GitDeploy, GitRef, ShellDeploy } from '@/config' import type { Deploy, GitDeploy, GitRef, ShellDeploy } from '@/config'
import type { Job, Worker } from '@/jobs' import type { DeployFunction, Job, JobBase } from '@/jobs'
import path from 'path' import path from 'path'
import { exists, normalizeURL, sleep } from '@/lib/utils' import { exists, normalizeURL, sleep } from '@/lib/utils'
import { runCommand } from '@/runners' import { type Runner } from '@/runners'
import { debug } from '@/logger' import { debug } from '@/logger'
const toSafePath = (target: string) => { const toSafePath = (target: string) => {
@ -28,29 +29,37 @@ function getDeployDirectory(deploy: GitDeploy): string {
return `${import.meta.env.CLONE_PATH ?? `${import.meta.env.DATA_PATH}/clone`}/${slug}` return `${import.meta.env.CLONE_PATH ?? `${import.meta.env.DATA_PATH}/clone`}/${slug}`
} }
async function cloneOrUpdateRepo(deploy: Deploy & { url: string; ref: GitRef }) { async function cloneOrUpdateRepo(runner: Runner, deploy: Deploy & { url: string; ref: GitRef }) {
const repoDir = getDeployDirectory(deploy) const repoDir = getDeployDirectory(deploy)
if (await exists(repoDir)) { if (await exists(repoDir)) {
await runCommand(`git -C "${repoDir}" pull`) await runner.command(`git -C "${repoDir}" pull`, {
silent: true,
})
} else { } else {
await runCommand(`mkdir -p "${repoDir}"`) await runner.command(`mkdir -p "${repoDir}"`, {
await runCommand(`git clone "${normalizeURL(deploy.url)}" "${repoDir}"`) silent: true,
})
await runner.command(`git clone "${normalizeURL(deploy.url)}" "${repoDir}"`, {
silent: true,
})
} }
if (deploy.ref.type !== 'default') { if (deploy.ref.type !== 'default') {
await runCommand(`git -C "${repoDir}" checkout "${deploy.ref.value}"`) await runner.command(`git -C "${repoDir}" checkout "${deploy.ref.value}"`, {
silent: true,
})
} }
} }
export async function shellRunner(deploy: ShellDeploy) { export async function shellRunner(runner: Runner, deploy: ShellDeploy) {
const { path, env } = deploy.options await cloneOrUpdateRepo(runner, deploy)
const repoDir = getDeployDirectory(deploy) const repoDir = getDeployDirectory(deploy)
const { path, env } = deploy.options
await cloneOrUpdateRepo(deploy) await runner.script(
[
const script = [
// mode to correct directory // mode to correct directory
`cd ${repoDir}`, `cd ${repoDir}`,
// append env variables // append env variables
@ -60,28 +69,27 @@ export async function shellRunner(deploy: ShellDeploy) {
// launch program // launch program
toSafePath(path ?? './deploy.sh'), toSafePath(path ?? './deploy.sh'),
].join('\n\n') ].join('\n\n')
)
await runCommand(script)
} }
export function createDeployJob(deploy: Deploy, submitter: any): Job & Worker { export function createDeployJob(deploy: Deploy, submitter: any): [JobBase, DeployFunction] {
return { return [
{
name: deploy.name, name: deploy.name,
submitter, submitter,
submittedAt: new Date(), },
async runner => {
async work() {
debug('[Runner]', `Deploying "${deploy.name}"`) debug('[Runner]', `Deploying "${deploy.name}"`)
await sleep(1000) await sleep(1000)
// TODO: Add other deploy types // TODO: Add other deploy types
if (deploy.type === 'shell') await shellRunner(deploy) if (deploy.type === 'shell') {
else { await shellRunner(runner, deploy)
} else {
throw new Error(`deploy type "${deploy.type}" not yet implemented`) throw new Error(`deploy type "${deploy.type}" not yet implemented`)
} }
debug('[Runner]', 'Finished deploy') debug('[Runner]', 'Finished deploy')
}, },
} ]
} }

@ -5,19 +5,218 @@ import { debug } from './logger'
import { createJsonDatabase } from './lib/file-db' import { createJsonDatabase } from './lib/file-db'
import { randomUUID } from 'crypto' import { randomUUID } from 'crypto'
import { createSimpleRunner, type Runner } from './runners'
import { createWriteStream } from 'fs'
import { readFile, mkdir } from 'fs/promises'
import { dirname } from 'path'
import type { random } from 'lodash'
type JobStatus = 'queued' | 'running' | 'completed'
export type Job = {
uuid: string
name: string
status: JobStatus
submitter: any
submittedAt: string
successful?: boolean
error?: any
startedAt?: string
completedAt?: string
}
export type DeployFunction = (runner: Runner) => Promise<void>
type QueuedJob = {
uuid: string
deployFn: DeployFunction
}
const emitter = new EventEmitter()
const jobsDB = createJsonDatabase<Job[]>(`${import.meta.env.DATA_PATH}/jobs.json`, [])
export function getJobLogFile(job: Job): string {
const timeHash = new Date(job.submittedAt).getTime()
// TODO: job.name is not safe
return `${import.meta.env.DATA_PATH}/logs/${job.name}_${timeHash}_${job.uuid.slice(0, 6)}.log`
}
// To ensure that the while loop inside "processQueue" is getting executed from
// only one call at a time.
let working = false
const queue: QueuedJob[] = []
async function processQueue() {
if (working) return
working = true
{
while (queue.length > 0) {
const { uuid, deployFn } = queue.shift()!
const job = await jobsDB.update(async jobs => {
const job = jobs.find(job => job.uuid === uuid)!
job.status = 'running'
return job
})
emitter.emit('job:started', job)
const startedAt = new Date().toISOString()
let error: string | undefined
const logsFilename = getJobLogFile(job)
await mkdir(dirname(logsFilename), { recursive: true })
const logsFile = createWriteStream(logsFilename, { flags: 'w' })
const runner = createSimpleRunner(line => {
logsFile.write(line + '\n')
// to not block log file creation
setImmediate(() => emitter.emit(`job:log:${uuid}`, line))
})
debug(`[Jobs] Starting job "${job.name}"`)
try {
await deployFn(runner)
} catch (e) {
error = e!.toString()
}
debug(`[Jobs] Finished job`)
const completedAt = new Date().toISOString()
const completedJob = await jobsDB.update(async jobs => {
const job = jobs.find(job => job.uuid === uuid)!
job.status = 'completed'
job.successful = error === undefined
job.error = error
job.startedAt = startedAt
job.completedAt = completedAt
return job
})
emitter.emit('job:completed', completedJob)
}
}
working = false
}
export type JobBase = {
name: string
submitter: any
}
// Use this function to add new jobs to the work queue
export async function enqueueJob(jobBase: JobBase, deployFn: DeployFunction) {
const uuid = randomUUID()
const job: Job = {
...jobBase,
uuid,
status: 'queued',
submittedAt: new Date().toISOString(),
}
await jobsDB.update(async jobs => {
jobs.push(job)
})
queue.push({ uuid, deployFn })
emitter.emit('job:add', job)
// starts concurrently a function to process jobs
processQueue()
}
export function getJobs(): Promise<Job[]> {
return jobsDB.load()
}
export async function getJob(uuid: string): Promise<Job> {
const jobs = await jobsDB.load()
const job = jobs.find(job => job.uuid === uuid)
if (!job) throw new Error(`no job with uuid "${uuid}"`)
return job
}
export async function getJobLogs(uuid: string): Promise<string> {
const job = await getJob(uuid)
try {
return await readFile(getJobLogFile(job), 'utf8')
} catch {
return ''
}
}
export const OnJobAdded = {
addListener(cb: (job: Job) => void) {
emitter.on('job:add', cb)
},
removeListener(cb: (job: Job) => void) {
emitter.off('job:add', cb)
},
}
export const OnJobCompleted = {
addListener(cb: (job: Job) => void) {
emitter.on('job:completed', cb)
},
removeListener(cb: (job: Job) => void) {
emitter.off('job:completed', cb)
},
}
export const OnJobStarted = {
addListener(cb: (job: Job) => void) {
emitter.on('job:started', cb)
},
removeListener(cb: (job: Job) => void) {
emitter.off('job:started', cb)
},
}
export const OnJobLog = {
addListener(uuid: string, cb: (line: string) => void) {
emitter.on(`job:log:${uuid}`, cb)
},
removeListener(uuid: string, cb: (line: string) => void) {
emitter.off(`job:log:${uuid}`, cb)
},
}
// ===================[ Old Version ]===================
/*
export type QueuedJob = { export type QueuedJob = {
uuid: string uuid: string
name: string name: string
status: JobStatus
submitter: any submitter: any
submittedAt: Date submittedAt: string
} }
export type CompletedJob = { export type CompletedJob = {
uuid: string uuid: string
name: string name: string
status: JobStatus
successful: boolean successful: boolean
error?: any error?: any
@ -30,22 +229,54 @@ export type CompletedJob = {
export type Job = { export type Job = {
name: string name: string
status: JobStatus
submitter: any
submittedAt: string
}
export type SubmittedJob = {
uuid: string
name: string
status: JobStatus
submitter: any submitter: any
submittedAt: Date submittedAt: string
successful?: boolean
error?: any
startedAt?: string
completedAt?: string
}
type JobBare = {
name: string
uuid: string
status: JobStatus
}
export function getJobLogFile(job: JobBare): string {
// TODO: job.name is not safe
return `${import.meta.env.DATA_PATH}/logs/${job.name}.${job.uuid}.log`
} }
export type Worker = { export type Worker = {
work: () => Promise<void> work: (runner: Runner) => Promise<void>
} }
// Event emitter & Queue (runtime only) // Event emitter & Queue (runtime only)
const emitter = new EventEmitter<{ const emitter = new EventEmitter<{
'job:add': [QueuedJob] 'job:add': [QueuedJob]
'job:completed': [CompletedJob] 'job:completed': [CompletedJob]
'job:log': [string]
}>() }>()
const queue: (QueuedJob & Worker)[] = [] const queue: (QueuedJob & Worker)[] = []
const runningJobs: Record<string, SubmittedJob> = {}
// Job db for logging purposes // Job db for logging purposes
const jobsDB = createJsonDatabase<CompletedJob[]>(`${import.meta.env.DATA_PATH}/jobs.json`, []) const jobsDB = createJsonDatabase<CompletedJob[]>(`${import.meta.env.DATA_PATH}/jobs.json`, [])
@ -64,9 +295,22 @@ async function processQueue() {
const startedAt = new Date().toISOString() const startedAt = new Date().toISOString()
let error: string | undefined let error: string | undefined
const logsFilename = getJobLogFile(job)
await mkdir(dirname(logsFilename), { recursive: true })
const logsFile = createWriteStream(logsFilename, { flags: 'w' })
const runner = createSimpleRunner(line => {
logsFile.write(line + '\n')
// to not block log file creation
setImmediate(() => emitter.emit('job:log', line))
})
debug(`[Jobs] Starting job "${job.name}"`) debug(`[Jobs] Starting job "${job.name}"`)
try { try {
await job.work() await job.work(runner)
} catch (e) { } catch (e) {
error = e!.toString() error = e!.toString()
} }
@ -74,15 +318,17 @@ async function processQueue() {
const completedAt = new Date().toISOString() const completedAt = new Date().toISOString()
const completedJob = { const completedJob: CompletedJob = {
uuid: job.uuid, uuid: job.uuid,
name: job.name, name: job.name,
status: 'completed',
successful: error === undefined, successful: error === undefined,
error, error,
submitter: job.submitter, submitter: job.submitter,
submittedAt: job.submittedAt.toISOString(), submittedAt: job.submittedAt,
startedAt, startedAt,
completedAt, completedAt,
} }
@ -97,9 +343,7 @@ async function processQueue() {
working = false working = false
} }
/** // Use this function to add new jobs to the work queue
* Use this function to add new jobs to the work queue
*/
export function enqueueJob(job: Job & Worker) { export function enqueueJob(job: Job & Worker) {
const queueJob = { ...job, uuid: randomUUID() } const queueJob = { ...job, uuid: randomUUID() }
@ -108,6 +352,7 @@ export function enqueueJob(job: Job & Worker) {
emitter.emit('job:add', { emitter.emit('job:add', {
uuid: queueJob.uuid, uuid: queueJob.uuid,
name: queueJob.name, name: queueJob.name,
status: 'queued',
submitter: queueJob.submitter, submitter: queueJob.submitter,
submittedAt: queueJob.submittedAt, submittedAt: queueJob.submittedAt,
}) })
@ -117,9 +362,10 @@ export function enqueueJob(job: Job & Worker) {
} }
export async function getQueuedJobs(): Promise<QueuedJob[]> { export async function getQueuedJobs(): Promise<QueuedJob[]> {
return queue.map(({ uuid, name, submitter, submittedAt }) => ({ return queue.map(({ uuid, name, status, submitter, submittedAt }) => ({
uuid, uuid,
name, name,
status,
submitter, submitter,
submittedAt, submittedAt,
})) }))
@ -129,6 +375,20 @@ export function getCompletedJobs(): Promise<CompletedJob[]> {
return jobsDB.load() return jobsDB.load()
} }
export async function getJob(uuid: string): Promise<SubmittedJob | undefined> {
const jobs = await jobsDB.load()
const job = jobs.find(job => job.uuid === uuid)
if (job) return job
const queuedJob = queue.find(job => job.uuid === uuid)
return queuedJob
}
export async function getJobLogs(job: JobBare): Promise<string> {
return await readFile(getJobLogFile(job), 'utf8')
}
export const OnJobAdded = { export const OnJobAdded = {
addListener(cb: (job: QueuedJob) => void) { addListener(cb: (job: QueuedJob) => void) {
emitter.on('job:add', cb) emitter.on('job:add', cb)
@ -146,3 +406,14 @@ export const OnJobCompleted = {
emitter.off('job:completed', cb) emitter.off('job:completed', cb)
}, },
} }
export const OnJobLog = {
addListener(cb: (line: string) => void) {
emitter.on('job:log', cb)
},
removeListener(cb: (line: string) => void) {
emitter.off('job:log', cb)
},
}
*/

@ -36,3 +36,48 @@ export function sleep(timeout: number) {
setTimeout(resolve, timeout) setTimeout(resolve, timeout)
}) })
} }
export class JsonResponse extends Response {
constructor(data: any) {
super(JSON.stringify(data), {
status: 200,
headers: {
'Content-Type': 'application/json',
},
})
}
}
export class JsonStreamResponse extends Response {
constructor(create: (send: (data: any) => void) => () => void | undefined) {
let cancelFn: () => void | undefined
const stream = new ReadableStream({
start(controller) {
const sendEvent = (data: any) => {
controller.enqueue(`data: ${JSON.stringify(data)}\r\n\r\n`)
}
cancelFn = create(sendEvent)
},
cancel() {
cancelFn?.()
},
})
super(stream, {
status: 200,
headers: {
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
},
})
}
}
export function createQueryUrl(base: string, options: Record<string, string>) {
return `${base}?${Object.entries(options)
.map(([k, v]) => k + '=' + encodeURIComponent(v))
.join('&')}`
}

@ -1,23 +0,0 @@
import { getCompletedJobs, getQueuedJobs } from '@/jobs'
import { debug } from '@/logger'
import type { APIRoute } from 'astro'
export const GET: APIRoute = async ({ request, params }) => {
debug('[API] Jobs:', params)
const queuedJobs = await getQueuedJobs()
const completedJobs = await getCompletedJobs()
return new Response(
JSON.stringify({
queuedJobs,
completedJobs,
}),
{
status: 200,
headers: {
'Content-Type': 'application/json',
},
}
)
}

@ -1,39 +0,0 @@
import { OnJobAdded, OnJobCompleted, type CompletedJob, type QueuedJob } from '@/jobs'
import { debug } from '@/logger'
import type { APIRoute } from 'astro'
export const GET: APIRoute = async ({ request }) => {
let jobAddedEvent: any
let jobCompletedEvent: any
const stream = new ReadableStream({
start(controller) {
const sendEvent = (data: any) => {
controller.enqueue(`data: ${JSON.stringify(data)}\r\n\r\n`)
}
jobAddedEvent = (job: QueuedJob) => sendEvent({ type: 'added', job })
jobCompletedEvent = (job: CompletedJob) => sendEvent({ type: 'completed', job })
debug('[SSE] Registering client')
OnJobAdded.addListener(jobAddedEvent)
OnJobCompleted.addListener(jobCompletedEvent)
},
cancel() {
OnJobAdded.removeListener(jobAddedEvent)
OnJobCompleted.removeListener(jobCompletedEvent)
debug('[SSE] Un-registered client')
},
})
return new Response(stream, {
status: 200,
headers: {
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
},
})
}

@ -20,7 +20,9 @@ export const POST: APIRoute = async ({ request }) => {
if (deploy.type !== 'docker') { if (deploy.type !== 'docker') {
if (URLS.includes(normalizeURL(deploy.url))) { if (URLS.includes(normalizeURL(deploy.url))) {
debug(`[Webhook] Triggering deploy for "${deploy.url}"`) debug(`[Webhook] Triggering deploy for "${deploy.url}"`)
enqueueJob(createDeployJob(deploy, { event: 'webhook', url: deploy.url }))
const [jobBase, deployFn] = createDeployJob(deploy, { event: 'webhook', url: deploy.url })
await enqueueJob(jobBase, deployFn)
} }
} }
} }

@ -0,0 +1,12 @@
---
import Layout from '@layouts/Layout.astro'
const message = Astro.url.searchParams.get('message')
const previous = Astro.url.searchParams.get('previous')
---
<Layout title="Error | phCD">
<h1>Error</h1>
<p>{message ?? 'Unknown error'}</p>
{previous && <a href={previous}>Back</a>}
</Layout>

@ -0,0 +1,35 @@
---
import { getJob, getJobLogs, OnJobLog, type QueuedJob } from '@/jobs'
import Layout from '@layouts/Layout.astro'
import { JobLogs } from '@client/JobLogs'
import { JsonResponse, createQueryUrl } from '@/lib/utils'
const { uuid } = Astro.params
const job = await getJob(uuid!)
if (!job) {
return Astro.redirect(
createQueryUrl('/error', {
message: `No job with uuid "${uuid}"`,
previous: `/jobs`,
}),
)
}
let logsContent
if (job.status === 'completed') {
logsContent = await getJobLogs(uuid)
}
---
<Layout title="{`${job.name}" | Jobs | phCD`}>
<h1>Job "{job.name}"</h1>
<pre><code>{JSON.stringify(job, null, 2)}</code></pre>
<h2>Logs</h2>
{job.status === 'completed' ?
<pre><code>{logsContent}</code></pre>
: <JobLogs client:load />}
</Layout>

@ -0,0 +1,35 @@
import { OnJobLog, getJob, getJobLogs } from '@/jobs'
import { JsonResponse, JsonStreamResponse, createQueryUrl } from '@/lib/utils'
import { debug } from '@/logger'
import type { APIRoute } from 'astro'
export const GET: APIRoute = async ({ request, params: { uuid }, url, redirect }) => {
const rawLogs = await getJobLogs(uuid!)
const format = url.searchParams.get('format')
switch (format) {
case 'raw':
return new Response(rawLogs)
case 'json':
return new JsonResponse(rawLogs.trim().split('\n'))
case 'sse':
return new JsonStreamResponse(sendData => {
const jobLog = (content: string) => sendData({ type: 'log', content })
debug('[SSE] Registering job log client')
OnJobLog.addListener(uuid!, jobLog)
return () => {
// cancel
OnJobLog.removeListener(uuid!, jobLog)
debug('[SSE] Un-registered job log client')
}
})
default:
return redirect(
createQueryUrl('/error', {
message: `Invalid format "${format}"`,
})
)
}
}

@ -1,7 +1,62 @@
--- ---
import Layout from '@layouts/Layout.astro' import Layout from '@layouts/Layout.astro'
import { JobsPage } from '@client/JobsPage'
import { JobsPage } from '@client/JobsPage.jsx' import { JsonResponse, JsonStreamResponse } from '@/lib/utils'
import {
getJobs,
OnJobAdded,
OnJobCompleted,
OnJobStarted,
} from '@/jobs'
import { debug } from '@/logger'
const searchParams = Astro.url.searchParams
if (searchParams.has('format')) {
const format = searchParams.get('format')
switch (format) {
case 'json':
const limit: number = searchParams.has('limit')
? parseInt(searchParams.get('limit')!)
: 10
const offset: number =
(searchParams.has('page') ? parseInt(searchParams.get('page')!) : 1) - 1
const jobs = await getJobs()
return new JsonResponse(
jobs
.toReversed()
.slice(offset * (limit + 1), (offset + 1) * (limit + 1))
)
case 'sse':
return new JsonStreamResponse(sendData => {
const jobChange = (job: Job) => sendData({ type: 'change', job })
debug('[SSE] Registering client')
OnJobAdded.addListener(jobChange)
OnJobCompleted.addListener(jobChange)
OnJobStarted.addListener(jobChange)
return () => {
// cancel
OnJobAdded.removeListener(jobChange)
OnJobCompleted.removeListener(jobChange)
OnJobStarted.removeListener(jobChange)
debug('[SSE] Un-registered client')
}
})
default:
return Astro.redirect(
createQueryUrl('/error', {
message: `Invalid format "${format}"`,
}),
)
}
}
--- ---
<Layout title="Deploys | phCD"> <Layout title="Deploys | phCD">

@ -2,6 +2,15 @@ import child_process from 'child_process'
import { debug } from './logger' import { debug } from './logger'
type RunnerLogOptions = {
silent?: boolean
}
export type Runner = {
script: (source: string, options?: RunnerLogOptions) => Promise<void>
command: (source: string, options?: RunnerLogOptions) => Promise<void>
}
function onLine(stream: any, cb: (line: string) => void) { function onLine(stream: any, cb: (line: string) => void) {
let buffer = '' let buffer = ''
@ -17,25 +26,20 @@ function onLine(stream: any, cb: (line: string) => void) {
}) })
} }
export async function runCommand(source: string): Promise<void> { export async function runShell(
if (!source.includes('\n')) { source: string,
debug('[Runner] Command:') logLn: ((s: string) => void) | undefined
debug('[Runner]', ' |', source) ): Promise<void> {
} else {
debug('[Runner] Script:')
source.split('\n').forEach(line => {
debug('[Runner]', ' |', line)
})
}
const child = child_process.exec(source) const child = child_process.exec(source)
onLine(child.stdout!, line => { onLine(child.stdout!, line => {
debug('[Runner]', ' >', line) debug('[Runner]', ' >', line)
logLn?.(line)
}) })
onLine(child.stderr!, line => { onLine(child.stderr!, line => {
debug('[Runner]', '!>', line) debug('[Runner]', '!>', line)
logLn?.(line)
}) })
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
@ -48,3 +52,22 @@ export async function runCommand(source: string): Promise<void> {
}) })
}) })
} }
export function createSimpleRunner(logLn: (s: string) => void): Runner {
return {
async command(source: string, { silent } = {}) {
debug('[Runner] Command:')
debug('[Runner]', ' |', source)
await runShell(source, silent ? undefined : logLn)
},
async script(source: string, { silent } = {}) {
debug('[Runner] Script:')
source.split('\n').forEach(line => {
debug('[Runner]', ' |', line)
})
await runShell(source, silent ? undefined : logLn)
},
}
}

@ -57,6 +57,20 @@ code {
line-height: 1; line-height: 1;
} }
pre {
background: #f0f0f0;
line-height: 1.5;
padding: 0.5rem;
border-radius: 0.5rem;
width: 100%;
& > code {
padding: 0;
}
}
code { code {
background: #f0f0f0; background: #f0f0f0;
padding: 0 0.25rem; padding: 0 0.25rem;
@ -428,6 +442,14 @@ body {
grid-template-columns: 1fr; grid-template-columns: 1fr;
grid-row: auto auto; grid-row: auto auto;
gap: 0.5rem;
cursor: pointer;
&.running {
background: #dfd;
}
& > .name { & > .name {
grid-column: span 3; grid-column: span 3;
@ -444,9 +466,14 @@ body {
& .submitted-at { & .submitted-at {
font-size: 15px; font-size: 15px;
} }
& .delta { & .delta {
font-size: 15px; font-size: 15px;
} }
&:hover {
background: $accent-500;
}
} }
} }
} }

Loading…
Cancel
Save