Skip to content

Commit

Permalink
Implemened new schema design, refs #2
Browse files Browse the repository at this point in the history
  • Loading branch information
simonw committed Nov 5, 2023
1 parent 21432c7 commit 58b4990
Showing 1 changed file with 52 additions and 23 deletions.
75 changes: 52 additions & 23 deletions datasette_enrichments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@
CREATE_JOB_TABLE_SQL = """
create table if not exists _enrichment_jobs (
id integer primary key,
enrichment text,
status text, -- [p]ending, [r]unning, [c]ancelled, [f]inished
enrichment text, -- slug of enrichment
database_name text,
table_name text,
filter_querystring text,
config text,
started_at text,
completed_at text,
next_cursor text,
row_count integer
filter_querystring text, -- querystring used to filter rows
config text, -- JSON dictionary of config
started_at text, -- ISO8601 when added
finished_at text, -- ISO8601 when completed or cancelled
cancel_reason text, -- null or reason for cancellation
next_cursor text, -- next cursor to fetch
row_count integer, -- number of rows to enrich at start
error_count integer, -- number of rows with errors encountered
done_count integer, -- number of rows processed
actor_id text, -- optional ID of actor who created the job
cost_100ths_cent integer -- cost of job so far in 1/100ths of a cent
)
""".strip()

Expand All @@ -32,11 +38,15 @@
class Enrichment:
batch_size = 100
runs_in_process = False
# Cancel run after this many errors
default_max_errors = 5

async def initialize(self, db, table, config):
pass

async def enqueue(self, datasette, db, table, filter_querystring, config):
async def enqueue(
self, datasette, db, table, filter_querystring, config, actor_id=None
):
# Enqueue a job
qs = filter_querystring
if qs:
Expand All @@ -52,19 +62,24 @@ def _insert(conn):
cursor = conn.execute(
"""
insert into _enrichment_jobs (
enrichment, database_name, table_name, filter_querystring, config, started_at, row_count
enrichment, status, database_name, table_name, filter_querystring,
config, started_at, row_count, error_count, done_count, cost_100ths_cent, actor_id
) values (
?, ?, ?, ?, ?, datetime('now'), ?
:enrichment, 'p', :database_name, :table_name, :filter_querystring, :config,
datetime('now'), :row_count, 0, 0, 0{}
)
""",
(
self.slug,
db.name,
table,
filter_querystring,
json.dumps(config or {}),
row_count,
""".format(
", :actor_id" if actor_id else ", null"
),
{
"enrichment": self.slug,
"database_name": db.name,
"table_name": table,
"filter_querystring": filter_querystring,
"config": json.dumps(config or {}),
"row_count": row_count,
"actor_id": actor_id,
},
)
return cursor.lastrowid

Expand Down Expand Up @@ -105,14 +120,27 @@ async def run_enrichment():
next_cursor = response.json()["next"]
if next_cursor:
await db.execute_write(
"update _enrichment_jobs set next_cursor = ? where id = ?",
(next_cursor, job["id"]),
"""
update _enrichment_jobs
set
next_cursor = ?,
done_count = done_count + ?
where id = ?
""",
(next_cursor, len(rows), job["id"]),
)
else:
# Mark complete
await db.execute_write(
"update _enrichment_jobs set completed_at = datetime('now') where id = ?",
(job["id"],),
"""
update _enrichment_jobs
set
finished_at = datetime('now'),
status = 'f',
done_count = done_count + ?
where id = ?
""",
(len(rows), job["id"]),
)
break

Expand All @@ -136,12 +164,13 @@ async def enrich_batch(self, db, table, rows, config):
await db.execute_write_many(
"update [{}] set {} where {}".format(table, sets, wheres), params
)
await asyncio.sleep(1)
await asyncio.sleep(0.3)


class Embeddings(Enrichment):
name = "OpenAI Embeddings"
slug = "openai-embeddings"
batch_size = 100
description = (
"Calculate embeddings for text columns in a table. Embeddings are numerical representations which "
"can be used to power semantic search and find related content."
Expand Down

0 comments on commit 58b4990

Please sign in to comment.