feat: working jobs page with sse
parent
28b16aee7a
commit
e87a9bef18
@ -1 +1,2 @@
|
||||
DATA_PATH=./data.local
|
||||
CONFIG_PATH=config.yaml
|
@ -1,26 +1,21 @@
|
||||
# build output
|
||||
# Build Output
|
||||
dist/
|
||||
# generated types
|
||||
.astro/
|
||||
|
||||
# dependencies
|
||||
# Node JS
|
||||
node_modules/
|
||||
*.log*
|
||||
|
||||
# logs
|
||||
npm-debug.log*
|
||||
yarn-debug.log*
|
||||
yarn-error.log*
|
||||
pnpm-debug.log*
|
||||
|
||||
|
||||
# environment variables
|
||||
# Environment Variables
|
||||
.env
|
||||
.env.production
|
||||
|
||||
# macOS-specific files
|
||||
.DS_Store
|
||||
|
||||
# editors
|
||||
# Editors
|
||||
.vscode
|
||||
|
||||
# Locals
|
||||
*.local*
|
||||
config.yaml
|
||||
|
||||
# macOS-specific files
|
||||
.DS_Store
|
@ -1,35 +1,39 @@
|
||||
{
|
||||
"name": "phcd",
|
||||
"type": "module",
|
||||
"version": "0.0.1",
|
||||
"scripts": {
|
||||
"dev": "astro dev",
|
||||
"start": "astro dev",
|
||||
"build": "astro check && astro build",
|
||||
"preview": "astro preview",
|
||||
"astro": "astro"
|
||||
},
|
||||
"dependencies": {
|
||||
"@astrojs/check": "^0.5.2",
|
||||
"@astrojs/node": "^8.2.0",
|
||||
"@astrojs/preact": "^3.1.0",
|
||||
"@fontsource-variable/material-symbols-outlined": "^5.0.22",
|
||||
"@fontsource/jetbrains-mono": "^5.0.18",
|
||||
"@fontsource/lato": "^5.0.18",
|
||||
"astro": "^4.3.5",
|
||||
"async-mutex": "^0.4.1",
|
||||
"dockerode": "^4.0.2",
|
||||
"js-yaml": "^4.1.0",
|
||||
"lodash": "^4.17.21",
|
||||
"nodegit": "^0.27.0",
|
||||
"preact": "^10.19.4",
|
||||
"typescript": "^5.3.3",
|
||||
"zod": "^3.22.4"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/dockerode": "^3.3.23",
|
||||
"@types/js-yaml": "^4.0.9",
|
||||
"@types/lodash": "^4.14.202",
|
||||
"sass": "^1.70.0"
|
||||
}
|
||||
"name": "phcd",
|
||||
"type": "module",
|
||||
"version": "0.0.1",
|
||||
"scripts": {
|
||||
"dev": "astro dev",
|
||||
"start": "astro dev",
|
||||
"build": "astro sync && astro check && astro build",
|
||||
"preview": "astro preview",
|
||||
"astro": "astro"
|
||||
},
|
||||
"dependencies": {
|
||||
"@astrojs/check": "^0.5.6",
|
||||
"@astrojs/node": "^8.2.3",
|
||||
"@astrojs/preact": "^3.1.1",
|
||||
"@fontsource-variable/material-symbols-outlined": "^5.0.24",
|
||||
"@fontsource/jetbrains-mono": "^5.0.19",
|
||||
"@fontsource/lato": "^5.0.19",
|
||||
"astro": "^4.4.11",
|
||||
"async-mutex": "^0.4.1",
|
||||
"dockerode": "^4.0.2",
|
||||
"js-yaml": "^4.1.0",
|
||||
"lodash": "^4.17.21",
|
||||
"nodegit": "^0.27.0",
|
||||
"preact": "^10.19.6",
|
||||
"typescript": "^5.3.3",
|
||||
"zod": "^3.22.4"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/babel-generator": "^6.25.8",
|
||||
"@types/babel__core": "^7.20.5",
|
||||
"@types/dockerode": "^3.3.24",
|
||||
"@types/js-yaml": "^4.0.9",
|
||||
"@types/lodash": "^4.14.202",
|
||||
"@types/node-fetch": "^2.6.11",
|
||||
"node-fetch": "^3.3.2",
|
||||
"sass": "^1.71.1"
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,75 @@
|
||||
import fetch from 'node-fetch'
|
||||
|
||||
const base = process.argv[2] ?? 'https://git.example.org/example/foo'
|
||||
|
||||
await fetch('http://localhost:4321/api/webhook', {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({
|
||||
secret: '3gEsCfjlV2ugRwgpU#w1*WaW*wa4NXgGmpCfkbG3',
|
||||
ref: 'refs/heads/main',
|
||||
before: '28e1879d029cb852e4844d9c718537df08844e03',
|
||||
after: 'bffeb74224043ba2feb48d137756c8a9331c449a',
|
||||
compare_url: `${base}/compare/28e1879d029cb852e4844d9c718537df08844e03...bffeb74224043ba2feb48d137756c8a9331c449a`,
|
||||
commits: [
|
||||
{
|
||||
id: 'bffeb74224043ba2feb48d137756c8a9331c449a',
|
||||
message: 'Webhooks Yay!',
|
||||
url: `${base}/commit/bffeb74224043ba2feb48d137756c8a9331c449a`,
|
||||
author: {
|
||||
name: 'Gitea',
|
||||
email: 'someone@gitea.io',
|
||||
username: 'gitea',
|
||||
},
|
||||
committer: {
|
||||
name: 'Gitea',
|
||||
email: 'someone@gitea.io',
|
||||
username: 'gitea',
|
||||
},
|
||||
timestamp: '2017-03-13T13:52:11-04:00',
|
||||
},
|
||||
],
|
||||
repository: {
|
||||
id: 140,
|
||||
owner: {
|
||||
id: 1,
|
||||
login: 'gitea',
|
||||
full_name: 'Gitea',
|
||||
email: 'someone@gitea.io',
|
||||
avatar_url: `${base}/avatars/1`,
|
||||
username: 'gitea',
|
||||
},
|
||||
name: 'foo',
|
||||
full_name: 'example/foo',
|
||||
description: 'An example repo',
|
||||
private: false,
|
||||
fork: false,
|
||||
html_url: `${base}`,
|
||||
ssh_url: 'ssh://gitea@git.example.org.git',
|
||||
clone_url: `${base}.git`,
|
||||
website: '',
|
||||
stars_count: 0,
|
||||
forks_count: 1,
|
||||
watchers_count: 1,
|
||||
open_issues_count: 7,
|
||||
default_branch: 'master',
|
||||
created_at: '2017-02-26T04:29:06-05:00',
|
||||
updated_at: '2017-03-13T13:51:58-04:00',
|
||||
},
|
||||
pusher: {
|
||||
id: 1,
|
||||
login: 'gitea',
|
||||
full_name: 'Gitea',
|
||||
email: 'someone@gitea.io',
|
||||
avatar_url: `${base}/avatars/1`,
|
||||
username: 'gitea',
|
||||
},
|
||||
sender: {
|
||||
id: 1,
|
||||
login: 'gitea',
|
||||
full_name: 'Gitea',
|
||||
email: 'someone@gitea.io',
|
||||
avatar_url: `${base}/avatars/1`,
|
||||
username: 'gitea',
|
||||
},
|
||||
}),
|
||||
})
|
@ -0,0 +1,114 @@
|
||||
import _ from 'lodash'
|
||||
import { useEffect, useState } from 'preact/hooks'
|
||||
|
||||
import { durationToString } from './lib/utils'
|
||||
|
||||
/**
|
||||
* @param {import('@/jobs.ts').QueuedJob} props
|
||||
*/
|
||||
export const QueuedJob = ({ uuid, name, submitter, submittedAt }) => (
|
||||
<div class="job queued" title={uuid}>
|
||||
<div class="name">{name}</div>
|
||||
<div class="footer">
|
||||
<div class="submitted-at">{new Date(submittedAt).toLocaleString()}</div>
|
||||
</div>
|
||||
</div>
|
||||
)
|
||||
|
||||
/**
|
||||
* @param {import('@/jobs.ts').CompletedJob} props
|
||||
*/
|
||||
export const CompletedJob = ({
|
||||
uuid,
|
||||
name,
|
||||
submitter,
|
||||
submittedAt,
|
||||
startedAt,
|
||||
completedAt,
|
||||
successful,
|
||||
error,
|
||||
}) => {
|
||||
return (
|
||||
<div class="job completed" title={uuid}>
|
||||
<div class="name">{name}</div>
|
||||
<div class="footer">
|
||||
<div class="submitted-at">{new Date(submittedAt).toLocaleString()}</div>
|
||||
<div class="delta">{durationToString(startedAt, completedAt)}</div>
|
||||
</div>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
export const JobsPage = ({}) => {
|
||||
const [jobStore, setJobStore] = useState({
|
||||
queuedJobs: {},
|
||||
completedJobs: {},
|
||||
})
|
||||
|
||||
useEffect(async () => {
|
||||
const res = await fetch('/api/jobs')
|
||||
const { queuedJobs, completedJobs } = await res.json()
|
||||
|
||||
const result = {
|
||||
queuedJobs: {},
|
||||
completedJobs: {},
|
||||
}
|
||||
|
||||
for (const item of queuedJobs) {
|
||||
result.queuedJobs[item.uuid] = item
|
||||
}
|
||||
|
||||
for (const item of completedJobs) {
|
||||
result.completedJobs[item.uuid] = item
|
||||
}
|
||||
|
||||
setJobStore(result)
|
||||
|
||||
// Setup SSE
|
||||
const es = new EventSource('/api/sse')
|
||||
es.addEventListener('message', ({ data }) => {
|
||||
const event = JSON.parse(data)
|
||||
if (event.type === 'added') {
|
||||
setJobStore(s => ({
|
||||
...s,
|
||||
queuedJobs: {
|
||||
...s.queuedJobs,
|
||||
[event.job.uuid]: event.job,
|
||||
},
|
||||
}))
|
||||
}
|
||||
if (event.type === 'completed') {
|
||||
setJobStore(s => ({
|
||||
queuedJobs: {
|
||||
..._.omit(s.queuedJobs, event.job.uuid),
|
||||
},
|
||||
completedJobs: {
|
||||
...s.completedJobs,
|
||||
[event.job.uuid]: event.job,
|
||||
},
|
||||
}))
|
||||
}
|
||||
})
|
||||
}, [])
|
||||
|
||||
return (
|
||||
<>
|
||||
<h2>Queued Jobs</h2>
|
||||
<div class="list">
|
||||
{Object.values(jobStore.queuedJobs)
|
||||
.toReversed()
|
||||
.map(queuedJob => (
|
||||
<QueuedJob {...queuedJob} />
|
||||
))}
|
||||
</div>
|
||||
<h2>Completed Jobs</h2>
|
||||
<div class="list">
|
||||
{Object.values(jobStore.completedJobs)
|
||||
.toReversed()
|
||||
.map(completedJob => (
|
||||
<CompletedJob {...completedJob} />
|
||||
))}
|
||||
</div>
|
||||
</>
|
||||
)
|
||||
}
|
@ -0,0 +1,233 @@
|
||||
function inspectZod(schema, path = []) {
|
||||
if ('typeName' in schema._def) {
|
||||
debug(' '.repeat(path.length), path.at(-1) ?? '<root>', '::', schema._def.typeName)
|
||||
if (schema._def.typeName === 'ZodUnion') {
|
||||
schema._def.options.forEach(option => inspectZod(option, [...path, '<union>']))
|
||||
return
|
||||
}
|
||||
if (schema._def.typeName === 'ZodObject') {
|
||||
Object.entries(schema._def.shape()).forEach(([k, v]) => {
|
||||
inspectZod(v, [...path, k])
|
||||
})
|
||||
return
|
||||
}
|
||||
if (schema._def.typeName === 'ZodTuple') {
|
||||
schema._def.items.forEach((item, i) => {
|
||||
inspectZod(item, [...path, i])
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
if (schema._def.typeName === 'ZodArray') {
|
||||
inspectZod(schema._def.type, [...path, '<index>'])
|
||||
|
||||
return
|
||||
}
|
||||
if (schema._def.typeName === 'ZodOptional') {
|
||||
inspectZod(schema._def.innerType, [...path, '?'])
|
||||
|
||||
return
|
||||
}
|
||||
if (schema._def.typeName === 'ZodLiteral') {
|
||||
debug(' '.repeat(path.length + 1), schema._def)
|
||||
|
||||
return
|
||||
}
|
||||
if (schema._def.typeName === 'ZodString') {
|
||||
return
|
||||
}
|
||||
if (schema._def.typeName === 'ZodRecord') {
|
||||
inspectZod(schema._def.keyType, [...path, '<key>'])
|
||||
inspectZod(schema._def.valueType, [...path, '<value>'])
|
||||
return
|
||||
}
|
||||
debug('_def:', schema._def)
|
||||
}
|
||||
}
|
||||
|
||||
const ZodField = ({ value, setValue, schema, path }) => {
|
||||
if (schema._def.typeName === 'ZodString') {
|
||||
return (
|
||||
<div>
|
||||
<label>{path.at(-1)}</label>
|
||||
<input value={value} onInput={e => setValue(path, e.target.value)} />
|
||||
</div>
|
||||
)
|
||||
}
|
||||
if (schema._def.typeName === 'ZodNumber') {
|
||||
return (
|
||||
<div>
|
||||
<label>{path.at(-1)}</label>
|
||||
<input value={value} onInput={e => setValue(path, e.target.value)} />
|
||||
</div>
|
||||
)
|
||||
}
|
||||
if (schema._def.typeName === 'ZodBoolean') {
|
||||
return (
|
||||
<div>
|
||||
<label>{path.at(-1)}</label>
|
||||
<input
|
||||
type="checkbox"
|
||||
checked={value}
|
||||
onInput={e => setValue(path, e.target.checked)}
|
||||
/>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
if (schema._def.typeName === 'ZodDate') {
|
||||
return (
|
||||
<div>
|
||||
<label>{path.at(-1)}</label>
|
||||
<input type="date" value={value} onInput={e => setValue(path, e.target.value)} />
|
||||
</div>
|
||||
)
|
||||
}
|
||||
if (schema._def.typeName === 'ZodObject') {
|
||||
return (
|
||||
<div>
|
||||
{Object.entries(schema._def.shape()).map(([k, v]) => {
|
||||
return (
|
||||
<ZodField
|
||||
value={value[k]}
|
||||
setValue={setValue}
|
||||
schema={v}
|
||||
path={[...path, k]}
|
||||
/>
|
||||
)
|
||||
})}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
if (schema._def.typeName === 'ZodArray') {
|
||||
return (
|
||||
<div>
|
||||
{value.map((v, i) => {
|
||||
return (
|
||||
<ZodField
|
||||
value={v}
|
||||
setValue={setValue}
|
||||
schema={schema._def.type}
|
||||
path={[...path, i]}
|
||||
/>
|
||||
)
|
||||
})}
|
||||
<button onClick={() => setValue(path, [...value, ''])}>Add</button>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
if (schema._def.typeName === 'ZodOptional') {
|
||||
return (
|
||||
<div>
|
||||
<ZodField
|
||||
value={value}
|
||||
setValue={setValue}
|
||||
schema={schema._def.innerType}
|
||||
path={path}
|
||||
/>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
if (schema._def.typeName === 'ZodLiteral') {
|
||||
return (
|
||||
<div>
|
||||
<ZodField value={value} setValue={setValue} schema={schema} path={path} />
|
||||
</div>
|
||||
)
|
||||
}
|
||||
if (schema._def.typeName === 'ZodUnion') {
|
||||
return (
|
||||
<div>
|
||||
<select value={value} onInput={e => setValue(path, e.target.value)}>
|
||||
{schema._def.options.map(option => {
|
||||
return <option value={option._def.value}>{option._def.value}</option>
|
||||
})}
|
||||
</select>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
const ZodObject = ({ value, setValue, schema, path }) => {
|
||||
return (
|
||||
<div>
|
||||
{Object.entries(schema._def.shape()).map(([k, v]) => {
|
||||
return (
|
||||
<ZodField value={value[k]} setValue={setValue} schema={v} path={[...path, k]} />
|
||||
)
|
||||
})}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
const ZodArray = ({ value, setValue, schema, path }) => {
|
||||
return (
|
||||
<div>
|
||||
{value.map((v, i) => {
|
||||
return (
|
||||
<ZodField
|
||||
value={v}
|
||||
setValue={setValue}
|
||||
schema={schema._def.type}
|
||||
path={[...path, i]}
|
||||
/>
|
||||
)
|
||||
})}
|
||||
<button onClick={() => setValue(path, [...value, ''])}>Add</button>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
const ZodOptional = ({ value, setValue, schema, path }) => {
|
||||
return (
|
||||
<div>
|
||||
<ZodField
|
||||
value={value}
|
||||
setValue={setValue}
|
||||
schema={schema._def.innerType}
|
||||
path={path}
|
||||
/>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
const ZodLiteral = ({ value, setValue, schema, path }) => {
|
||||
return (
|
||||
<div>
|
||||
<ZodField value={value} setValue={setValue} schema={schema} path={path} />
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
const ZodUnion = ({ value, setValue, schema, path }) => {
|
||||
return (
|
||||
<div>
|
||||
<select value={value} onInput={e => setValue(path, e.target.value)}>
|
||||
{schema._def.options.map(option => {
|
||||
return <option value={option._def.value}>{option._def.value}</option>
|
||||
})}
|
||||
</select>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
export const ZodForm = ({ value, setValue, schema }) => {
|
||||
return (
|
||||
<div>
|
||||
{schema._def.typeName === 'ZodObject' && (
|
||||
<ZodObject value={value} setValue={setValue} schema={schema} path={[]} />
|
||||
)}
|
||||
{schema._def.typeName === 'ZodArray' && (
|
||||
<ZodArray value={value} setValue={setValue} schema={schema} path={[]} />
|
||||
)}
|
||||
{schema._def.typeName === 'ZodOptional' && (
|
||||
<ZodOptional value={value} setValue={setValue} schema={schema} path={[]} />
|
||||
)}
|
||||
{schema._def.typeName === 'ZodLiteral' && (
|
||||
<ZodLiteral value={value} setValue={setValue} schema={schema} path={[]} />
|
||||
)}
|
||||
{schema._def.typeName === 'ZodUnion' && (
|
||||
<ZodUnion value={value} setValue={setValue} schema={schema} path={[]} />
|
||||
)}
|
||||
</div>
|
||||
)
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
export function durationToString(from, to) {
|
||||
from = new Date(from)
|
||||
to = new Date(to)
|
||||
|
||||
let s = to.getTime() - from.getTime()
|
||||
|
||||
const millis = s % 1000
|
||||
s = (s - millis) / 1000
|
||||
if (s === 0) return `${millis}ms`
|
||||
|
||||
const seconds = s % 60
|
||||
s = (s - seconds) / 60
|
||||
if (s === 0) return `${seconds}s${millis}ms`
|
||||
|
||||
const minutes = s % 60
|
||||
s = (s - minutes) / 60
|
||||
if (s === 0) return `${minutes}s${seconds}s`
|
||||
|
||||
const hours = s
|
||||
return `${hours}h${minutes}m`
|
||||
}
|
@ -0,0 +1,87 @@
|
||||
import type { Deploy, GitDeploy, GitRef, ShellDeploy } from '@/config'
|
||||
import type { Job, Worker } from '@/jobs'
|
||||
|
||||
import path from 'path'
|
||||
|
||||
import { exists, normalizeURL, sleep } from '@/lib/utils'
|
||||
|
||||
import { runCommand } from '@/runners'
|
||||
import { debug } from '@/logger'
|
||||
|
||||
const toSafePath = (target: string) => {
|
||||
return '.' + path.posix.normalize('/' + target)
|
||||
}
|
||||
|
||||
function getDeployDirectory(deploy: GitDeploy): string {
|
||||
const { url, ref } = deploy
|
||||
|
||||
const repoSlug = url
|
||||
.replace(/(^\w+:|^)\/\//, '') // strip protocol
|
||||
.replace(/[^a-zA-Z]+/g, '-') // only keep letters, other symbols become dashes
|
||||
.replace(/^\-|\-$/g, '') // remove leading or trailing dashes
|
||||
|
||||
const slug =
|
||||
ref.type === 'default'
|
||||
? `${deploy.name}_${repoSlug}`
|
||||
: `${deploy.name}_${repoSlug}@${ref.value}`
|
||||
|
||||
return `${import.meta.env.CLONE_PATH ?? `${import.meta.env.DATA_PATH}/clone`}/${slug}`
|
||||
}
|
||||
|
||||
async function cloneOrUpdateRepo(deploy: Deploy & { url: string; ref: GitRef }) {
|
||||
const repoDir = getDeployDirectory(deploy)
|
||||
|
||||
if (await exists(repoDir)) {
|
||||
await runCommand(`git -C "${repoDir}" pull`)
|
||||
} else {
|
||||
await runCommand(`mkdir -p "${repoDir}"`)
|
||||
await runCommand(`git clone "${normalizeURL(deploy.url)}" "${repoDir}"`)
|
||||
}
|
||||
|
||||
if (deploy.ref.type !== 'default') {
|
||||
await runCommand(`git -C "${repoDir}" checkout "${deploy.ref.value}"`)
|
||||
}
|
||||
}
|
||||
|
||||
export async function shellRunner(deploy: ShellDeploy) {
|
||||
const { path, env } = deploy.options
|
||||
|
||||
const repoDir = getDeployDirectory(deploy)
|
||||
|
||||
await cloneOrUpdateRepo(deploy)
|
||||
|
||||
const script = [
|
||||
// mode to correct directory
|
||||
`cd ${repoDir}`,
|
||||
// append env variables
|
||||
Object.entries(env ?? {})
|
||||
.map(([key, value]) => `export ${key}="${value.replace(/"/g, '\\"')}"`)
|
||||
.join('\n'),
|
||||
// launch program
|
||||
toSafePath(path ?? './deploy.sh'),
|
||||
].join('\n\n')
|
||||
|
||||
await runCommand(script)
|
||||
}
|
||||
|
||||
export function createDeployJob(deploy: Deploy, submitter: any): Job & Worker {
|
||||
return {
|
||||
name: deploy.name,
|
||||
|
||||
submitter,
|
||||
submittedAt: new Date(),
|
||||
|
||||
async work() {
|
||||
debug('[Runner]', `Deploying "${deploy.name}"`)
|
||||
await sleep(1000)
|
||||
|
||||
// TODO: Add other deploy types
|
||||
if (deploy.type === 'shell') await shellRunner(deploy)
|
||||
else {
|
||||
throw new Error(`deploy type "${deploy.type}" not yet implemented`)
|
||||
}
|
||||
|
||||
debug('[Runner]', 'Finished deploy')
|
||||
},
|
||||
}
|
||||
}
|
@ -1,28 +1,148 @@
|
||||
const queue: (() => Promise<void>)[] = []
|
||||
import { EventEmitter } from 'events'
|
||||
|
||||
// to ensure that the while loop inside triggerProcessQueue is getting executed from only one call at a time
|
||||
let working = false
|
||||
import { debug } from './logger'
|
||||
|
||||
import { createJsonDatabase } from './lib/file-db'
|
||||
|
||||
import { randomUUID } from 'crypto'
|
||||
|
||||
export type QueuedJob = {
|
||||
uuid: string
|
||||
name: string
|
||||
|
||||
submitter: any
|
||||
submittedAt: Date
|
||||
}
|
||||
|
||||
export type CompletedJob = {
|
||||
uuid: string
|
||||
name: string
|
||||
|
||||
successful: boolean
|
||||
error?: any
|
||||
|
||||
submitter: any
|
||||
submittedAt: string
|
||||
startedAt: string
|
||||
completedAt: string
|
||||
}
|
||||
|
||||
export type Job = {
|
||||
name: string
|
||||
|
||||
export function runPendingJobs() {
|
||||
triggerProcessQueue()
|
||||
submitter: any
|
||||
submittedAt: Date
|
||||
}
|
||||
|
||||
async function triggerProcessQueue() {
|
||||
export type Worker = {
|
||||
work: () => Promise<void>
|
||||
}
|
||||
|
||||
// Event emitter & Queue (runtime only)
|
||||
const emitter = new EventEmitter<{
|
||||
'job:add': [QueuedJob]
|
||||
'job:completed': [CompletedJob]
|
||||
}>()
|
||||
|
||||
const queue: (QueuedJob & Worker)[] = []
|
||||
|
||||
// Job db for logging purposes
|
||||
const jobsDB = createJsonDatabase<CompletedJob[]>(`${import.meta.env.DATA_PATH}/jobs.json`, [])
|
||||
|
||||
// To ensure that the while loop inside "processQueue" is getting executed from
|
||||
// only one call at a time.
|
||||
let working = false
|
||||
|
||||
async function processQueue() {
|
||||
if (working) return
|
||||
|
||||
working = true
|
||||
{
|
||||
while (queue.length > 0) {
|
||||
const job = queue.shift()!
|
||||
await job()
|
||||
|
||||
const startedAt = new Date().toISOString()
|
||||
let error: string | undefined
|
||||
|
||||
debug(`[Jobs] Starting job "${job.name}"`)
|
||||
try {
|
||||
await job.work()
|
||||
} catch (e) {
|
||||
error = e!.toString()
|
||||
}
|
||||
debug(`[Jobs] Finished job`)
|
||||
|
||||
const completedAt = new Date().toISOString()
|
||||
|
||||
const completedJob = {
|
||||
uuid: job.uuid,
|
||||
name: job.name,
|
||||
|
||||
successful: error === undefined,
|
||||
error,
|
||||
|
||||
submitter: job.submitter,
|
||||
submittedAt: job.submittedAt.toISOString(),
|
||||
startedAt,
|
||||
completedAt,
|
||||
}
|
||||
|
||||
await jobsDB.update(async jobs => {
|
||||
jobs.push(completedJob)
|
||||
})
|
||||
|
||||
emitter.emit('job:completed', completedJob)
|
||||
}
|
||||
}
|
||||
working = false
|
||||
}
|
||||
|
||||
export function addJob(job: () => Promise<void>) {
|
||||
queue.push(job)
|
||||
/**
|
||||
* Use this function to add new jobs to the work queue
|
||||
*/
|
||||
export function enqueueJob(job: Job & Worker) {
|
||||
const queueJob = { ...job, uuid: randomUUID() }
|
||||
|
||||
queue.push(queueJob)
|
||||
|
||||
emitter.emit('job:add', {
|
||||
uuid: queueJob.uuid,
|
||||
name: queueJob.name,
|
||||
submitter: queueJob.submitter,
|
||||
submittedAt: queueJob.submittedAt,
|
||||
})
|
||||
|
||||
// starts concurrently a function to process jobs
|
||||
triggerProcessQueue()
|
||||
processQueue()
|
||||
}
|
||||
|
||||
export async function getQueuedJobs(): Promise<QueuedJob[]> {
|
||||
return queue.map(({ uuid, name, submitter, submittedAt }) => ({
|
||||
uuid,
|
||||
name,
|
||||
submitter,
|
||||
submittedAt,
|
||||
}))
|
||||
}
|
||||
|
||||
export function getCompletedJobs(): Promise<CompletedJob[]> {
|
||||
return jobsDB.load()
|
||||
}
|
||||
|
||||
export const OnJobAdded = {
|
||||
addListener(cb: (job: QueuedJob) => void) {
|
||||
emitter.on('job:add', cb)
|
||||
},
|
||||
removeListener(cb: (job: QueuedJob) => void) {
|
||||
emitter.off('job:add', cb)
|
||||
},
|
||||
}
|
||||
|
||||
export const OnJobCompleted = {
|
||||
addListener(cb: (job: CompletedJob) => void) {
|
||||
emitter.on('job:completed', cb)
|
||||
},
|
||||
removeListener(cb: (job: CompletedJob) => void) {
|
||||
emitter.off('job:completed', cb)
|
||||
},
|
||||
}
|
||||
|
@ -0,0 +1,81 @@
|
||||
import { readFile, writeFile } from 'fs/promises'
|
||||
|
||||
import { Mutex } from 'async-mutex'
|
||||
import { exists } from './utils'
|
||||
|
||||
import yaml from 'js-yaml'
|
||||
|
||||
export function createJsonDatabase<T>(filename: string, initialValue: T) {
|
||||
const mutex = new Mutex()
|
||||
|
||||
function ensureExists() {
|
||||
return mutex.runExclusive(async () => {
|
||||
if (!(await exists(filename))) {
|
||||
await writeFile(filename, JSON.stringify(initialValue, null, 2))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return {
|
||||
mutex,
|
||||
async load(): Promise<T> {
|
||||
await ensureExists()
|
||||
|
||||
return await mutex.runExclusive(async () => {
|
||||
const data = await readFile(filename, 'utf8')
|
||||
return JSON.parse(data) as T
|
||||
})
|
||||
},
|
||||
async update<R>(fn: (value: T) => Promise<R>) {
|
||||
await ensureExists()
|
||||
|
||||
return await mutex.runExclusive(async () => {
|
||||
const data = await readFile(filename, 'utf8')
|
||||
const value = JSON.parse(data) as T
|
||||
|
||||
const result = await fn(value)
|
||||
|
||||
await writeFile(filename, JSON.stringify(value, null, 2))
|
||||
|
||||
return result
|
||||
})
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
export function createYamlDatabase<T>(filename: string, initialValue: T) {
|
||||
const mutex = new Mutex()
|
||||
|
||||
function ensureExists() {
|
||||
return mutex.runExclusive(async () => {
|
||||
if (!(await exists(filename))) {
|
||||
await writeFile(filename, yaml.dump(initialValue))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return {
|
||||
async load(): Promise<T> {
|
||||
await ensureExists()
|
||||
|
||||
return await mutex.runExclusive(async () => {
|
||||
const data = await readFile(filename, 'utf8')
|
||||
return yaml.load(data) as T
|
||||
})
|
||||
},
|
||||
async update<R>(fn: (value: T) => Promise<R>) {
|
||||
await ensureExists()
|
||||
|
||||
return await mutex.runExclusive(async () => {
|
||||
const data = await readFile(filename, 'utf8')
|
||||
const value = yaml.load(data) as T
|
||||
|
||||
const result = await fn(value)
|
||||
|
||||
await writeFile(filename, yaml.dump(value))
|
||||
|
||||
return result
|
||||
})
|
||||
},
|
||||
}
|
||||
}
|
@ -0,0 +1,38 @@
|
||||
import { access } from 'fs/promises'
|
||||
|
||||
export const normalizeURL = (url: string) => {
|
||||
if (!url.startsWith('/') && !url.startsWith('https://') && !url.startsWith('http://')) {
|
||||
url = `https://${url}`
|
||||
}
|
||||
|
||||
return url
|
||||
}
|
||||
|
||||
export const clsx = (...args: any[]) =>
|
||||
args
|
||||
.filter(Boolean)
|
||||
.flatMap(s => (typeof s === 'string' ? s.split(' ') : [s]))
|
||||
.join(' ')
|
||||
|
||||
/**
|
||||
* Modern alternative to fs.existsSync
|
||||
*/
|
||||
export async function exists(path: string) {
|
||||
try {
|
||||
await access(path)
|
||||
return true
|
||||
} catch (err) {
|
||||
// @ts-ignore
|
||||
if (err.code === 'ENOENT') {
|
||||
return false
|
||||
} else {
|
||||
throw err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function sleep(timeout: number) {
|
||||
return new Promise(resolve => {
|
||||
setTimeout(resolve, timeout)
|
||||
})
|
||||
}
|
@ -0,0 +1,8 @@
|
||||
import { inspect } from 'util'
|
||||
|
||||
export const debugToString = (value: any) =>
|
||||
typeof value === 'string' ? value : inspect(value, false, 5, true)
|
||||
|
||||
export const debug = (...args: any[]) => {
|
||||
process.stderr.write(args.map(arg => debugToString(arg)).join(' ') + '\n')
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
import { getCompletedJobs, getQueuedJobs } from '@/jobs'
|
||||
import { debug } from '@/logger'
|
||||
import type { APIRoute } from 'astro'
|
||||
|
||||
export const GET: APIRoute = async ({ request, params }) => {
|
||||
debug('[API] Jobs:', params)
|
||||
|
||||
const queuedJobs = await getQueuedJobs()
|
||||
const completedJobs = await getCompletedJobs()
|
||||
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
queuedJobs,
|
||||
completedJobs,
|
||||
}),
|
||||
{
|
||||
status: 200,
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
}
|
||||
)
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
import { OnJobAdded, OnJobCompleted, type CompletedJob, type QueuedJob } from '@/jobs'
|
||||
import { debug } from '@/logger'
|
||||
import type { APIRoute } from 'astro'
|
||||
|
||||
export const GET: APIRoute = async ({ request }) => {
|
||||
let jobAddedEvent: any
|
||||
let jobCompletedEvent: any
|
||||
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
const sendEvent = (data: any) => {
|
||||
controller.enqueue(`data: ${JSON.stringify(data)}\r\n\r\n`)
|
||||
}
|
||||
|
||||
jobAddedEvent = (job: QueuedJob) => sendEvent({ type: 'added', job })
|
||||
jobCompletedEvent = (job: CompletedJob) => sendEvent({ type: 'completed', job })
|
||||
|
||||
debug('[SSE] Registering client')
|
||||
|
||||
OnJobAdded.addListener(jobAddedEvent)
|
||||
OnJobCompleted.addListener(jobCompletedEvent)
|
||||
},
|
||||
cancel() {
|
||||
OnJobAdded.removeListener(jobAddedEvent)
|
||||
OnJobCompleted.removeListener(jobCompletedEvent)
|
||||
|
||||
debug('[SSE] Un-registered client')
|
||||
},
|
||||
})
|
||||
|
||||
return new Response(stream, {
|
||||
status: 200,
|
||||
headers: {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Connection': 'keep-alive',
|
||||
'Cache-Control': 'no-cache',
|
||||
},
|
||||
})
|
||||
}
|
@ -0,0 +1,10 @@
|
||||
---
|
||||
import Layout from '@layouts/Layout.astro'
|
||||
|
||||
import { JobsPage } from '@client/JobsPage.jsx'
|
||||
---
|
||||
|
||||
<Layout title="Deploys | phCD">
|
||||
<h1>Jobs</h1>
|
||||
<JobsPage client:load />
|
||||
</Layout>
|
@ -1,5 +0,0 @@
|
||||
export const clsx = (...args) =>
|
||||
args
|
||||
.filter(Boolean)
|
||||
.flatMap(s => (typeof s === 'string' ? s.split(' ') : [s]))
|
||||
.join(' ')
|
Loading…
Reference in New Issue