Skip to content

Commit

Permalink
feat: 增加Oban实现在后台任务队列里,继续翻译未翻译完内容
Browse files Browse the repository at this point in the history
  • Loading branch information
shuiRong committed Dec 24, 2024
1 parent ec98d43 commit 51057c3
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 42 deletions.
5 changes: 5 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ config :honyaku, HonyakuWeb.Endpoint,
# at the `config/runtime.exs`.
config :honyaku, Honyaku.Mailer, adapter: Swoosh.Adapters.Local

config :honyaku, Oban,
engine: Oban.Engines.Basic,
queues: [default: 10, translate: 10],
repo: Honyaku.Repo

# Configure esbuild (the version is required)
config :esbuild,
version: "0.17.11",
Expand Down
3 changes: 3 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ config :honyaku, HonyakuWeb.Endpoint,
# In test we don't send emails
config :honyaku, Honyaku.Mailer, adapter: Swoosh.Adapters.Test

# 为了防止 Oban 在测试运行期间运行作业和插件
config :honyaku, Oban, testing: :inline

# Disable swoosh api client as it is only required for production adapters
config :swoosh, :api_client, false

Expand Down
3 changes: 2 additions & 1 deletion lib/honyaku/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ defmodule Honyaku.Application do
# Start a worker by calling: Honyaku.Worker.start_link(arg)
# {Honyaku.Worker, arg},
# Start to serve requests, typically the last entry
HonyakuWeb.Endpoint
HonyakuWeb.Endpoint,
{Oban, Application.fetch_env!(:honyaku, Oban)}
]

# See https://hexdocs.pm/elixir/Supervisor.html
Expand Down
8 changes: 4 additions & 4 deletions lib/honyaku/contexts/feeds/feed.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ defmodule Honyaku.Feeds do
alias Honyaku.Repo
alias Honyaku.Feeds.Feed
alias Honyaku.Feeds.Article
alias Honyaku.Feeds.ParseFeed
alias Honyaku.Feeds.Parser
alias Honyaku.Utils.DateTimeUtils

def load_translated_feed(url, target_lang, source_lang) do
with {:ok, parsed_feed} <- load_feed(url),
{:ok, feed, save_feed_and_articles} <- save_feed_and_articles(url, parsed_feed),
{:ok, translated_feed} <-
ParseFeed.translate_feed(feed, save_feed_and_articles, target_lang, source_lang) do
Parser.translate_feed(feed, save_feed_and_articles, target_lang, source_lang) do
{:ok, translated_feed}
end
end

def load_feed(url) do
with {:ok, raw_content} <- fetch_feed_content(url),
{:ok, feed_type} <- ParseFeed.detect_feed_type(raw_content),
{:ok, parsed_feed} <- ParseFeed.parse_feed(feed_type, raw_content) do
{:ok, feed_type} <- Parser.detect_feed_type(raw_content),
{:ok, parsed_feed} <- Parser.parse_feed(feed_type, raw_content) do
{:ok, parsed_feed}
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/honyaku/contexts/feeds/model/article.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule Honyaku.Feeds.Article.Content do
use Ecto.Schema
@derive {Jason.Encoder, only: [:id, :type, :value]}

embedded_schema do
field :type, :string
Expand All @@ -9,6 +10,7 @@ end

defmodule Honyaku.Feeds.Article.Summary do
use Ecto.Schema
@derive {Jason.Encoder, only: [:id, :type, :value]}

embedded_schema do
field :type, :string
Expand Down
171 changes: 136 additions & 35 deletions lib/honyaku/contexts/feeds/parser.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Honyaku.Feeds.ParseFeed do
defmodule Honyaku.Feeds.Parser do
@moduledoc """
解析、转换、翻译 RSS 订阅源
"""
Expand Down Expand Up @@ -173,28 +173,109 @@ defmodule Honyaku.Feeds.ParseFeed do
article_summary_task =
get_feed_article_field_translation(article, "summary", target_lang, source_lang)

# 同时等待所有字段的翻译任务完成
with {:ok, translated_title} <- Task.await(article_title_task, 1_000 * 60),
{:ok, translated_content} <- Task.await(article_content_task, 1_000 * 60),
{:ok, translated_summary} <- Task.await(article_summary_task, 1_000 * 60) do
{:ok,
%{
article
| title: translated_title,
content: %{
value: translated_content,
type: article.content.type
},
summary: %{
value: translated_summary,
type: article.summary.type
}
}}
else
{:error, reason} ->
Logger.error("条目翻译失败 #{inspect(reason)}")
{:ok, article}
end
tasks_with_results =
Task.yield_many(
[article_title_task, article_content_task, article_summary_task],
timeout: 1_000 * 60,
on_timeout: :kill_task
)

[title_result, content_result, summary_result] =
Enum.map(tasks_with_results, fn
{_task, {:ok, {:ok, translated_text}}} -> {:ok, translated_text}
{_task, {:ok, {:error, reason}}} -> {:error, reason}
{_task, {:exit, reason}} -> {:error, reason}
{_task, nil} -> {:error, :timeout}
end)

# 处理标题翻译
translated_title =
case title_result do
{:ok, text} ->
text

{:error, _reason} ->
# 创建标题翻译任务
%{
saved_article: %{
id: article.id,
title: article.title,
content: article.content,
summary: article.summary
},
field: "title",
target_lang: target_lang,
source_lang: source_lang
}
|> Honyaku.TranslateJob.new()
|> Oban.insert()

article.title
end

# 处理内容翻译
translated_content =
case content_result do
{:ok, text} ->
text

{:error, _reason} ->
# 创建内容翻译任务
%{
saved_article: %{
id: article.id,
title: article.title,
content: article.content,
summary: article.summary
},
field: "content",
target_lang: target_lang,
source_lang: source_lang
}
|> Honyaku.TranslateJob.new()
|> Oban.insert()

article.content.value
end

# 处理摘要翻译
translated_summary =
case summary_result do
{:ok, text} ->
text

{:error, _reason} ->
# 创建摘要翻译任务
%{
saved_article: %{
id: article.id,
title: article.title,
content: article.content,
summary: article.summary
},
field: "summary",
target_lang: target_lang,
source_lang: source_lang
}
|> Honyaku.TranslateJob.new()
|> Oban.insert()

article.summary.value
end

{:ok,
%{
article
| title: translated_title,
content: %{
value: translated_content,
type: article.content.type
},
summary: %{
value: translated_summary,
type: article.summary.type
}
}}
end)
end)

Expand All @@ -206,6 +287,21 @@ defmodule Honyaku.Feeds.ParseFeed do

{:error, reason} ->
Logger.debug("标题翻译失败:#{inspect(reason)}")

# 创建翻译任务
%{
saved_feed: %{
id: saved_feed.id,
title: saved_feed.title,
subtitle: saved_feed.subtitle
},
field: "title",
target_lang: target_lang,
source_lang: source_lang
}
|> Honyaku.TranslateJob.new()
|> Oban.insert()

# 使用原始标题作为回退
saved_feed.title
end
Expand All @@ -217,6 +313,21 @@ defmodule Honyaku.Feeds.ParseFeed do

{:error, reason} ->
Logger.error("副标题翻译失败:#{inspect(reason)}")

# 创建翻译任务
%{
saved_feed: %{
id: saved_feed.id,
title: saved_feed.title,
subtitle: saved_feed.subtitle
},
field: "subtitle",
target_lang: target_lang,
source_lang: source_lang
}
|> Honyaku.TranslateJob.new()
|> Oban.insert()

# 使用原始副标题作为兜底
saved_feed.subtitle
end
Expand All @@ -225,14 +336,8 @@ defmodule Honyaku.Feeds.ParseFeed do
translated_articles =
article_articles
|> Enum.map(fn task ->
case Task.await(task, 1_000 * 60) do
{:ok, article} ->
article

{:error, reason} ->
Logger.error("条目翻译失败:#{inspect(reason)}")
{:error, reason}
end
{:ok, article} = Task.await(task, 1_000 * 60)
article
end)

{:ok,
Expand Down Expand Up @@ -320,10 +425,6 @@ defmodule Honyaku.Feeds.ParseFeed do
article_id: saved_article.id
}) do
{:ok, translated_text}
else
{:error, reason} ->
Logger.debug("条目 #{field} 翻译失败 #{inspect(reason)}, 使用原始文本兜底")
{:ok, text}
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/honyaku/external/deepl_translator/mymemory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Honyaku.External.DeeplTranslator.MyMemory do
}

case Req.post(
"#{@base_url}/mymemory",
"#{@base_url}/mymemory/",
json: body
) do
{:ok,
Expand Down
35 changes: 35 additions & 0 deletions lib/honyaku/job/translate_job.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
defmodule Honyaku.TranslateJob do
use Oban.Worker, queue: :translate, unique: true

require Logger

alias Honyaku.Feeds.Parser

@impl Oban.Worker
def perform(%Oban.Job{
args: %{
"saved_feed" => saved_feed,
"field" => field,
"target_lang" => target_lang,
"source_lang" => source_lang
}
}) do
Parser.translate_and_save_feed_field(saved_feed, field, target_lang, source_lang)

:ok
end

@impl Oban.Worker
def perform(%Oban.Job{
args: %{
"saved_article" => saved_article,
"field" => field,
"target_lang" => target_lang,
"source_lang" => source_lang
}
}) do
Parser.translate_and_save_article_field(saved_article, field, target_lang, source_lang)

:ok
end
end
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ defmodule Honyaku.MixProject do
{:fast_rss, "~> 0.5.0"},
{:req, "~> 0.5.0"},
{:atomex, "~> 0.5.1"},
{:timex, "~> 3.7.11"}
{:timex, "~> 3.7.11"},
{:oban, "~> 2.17"}
]
end

Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
"oban": {:hex, :oban, "2.18.3", "1608c04f8856c108555c379f2f56bc0759149d35fa9d3b825cb8a6769f8ae926", [:mix], [{:ecto_sql, "~> 3.10", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:ecto_sqlite3, "~> 0.9", [hex: :ecto_sqlite3, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.16", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "36ca6ca84ef6518f9c2c759ea88efd438a3c81d667ba23b02b062a0aa785475e"},
"parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"},
"phoenix": {:hex, :phoenix, "1.7.14", "a7d0b3f1bc95987044ddada111e77bd7f75646a08518942c72a8440278ae7825", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "c7859bc56cc5dfef19ecfc240775dae358cbaa530231118a9e014df392ace61a"},
"phoenix_ecto": {:hex, :phoenix_ecto, "4.6.3", "f686701b0499a07f2e3b122d84d52ff8a31f5def386e03706c916f6feddf69ef", [:mix], [{:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.1", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.16 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}], "hexpm", "909502956916a657a197f94cc1206d9a65247538de8a5e186f7537c895d95764"},
Expand Down
13 changes: 13 additions & 0 deletions priv/repo/migrations/20241224141808_add_oban_jobs_table.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule Honyaku.Repo.Migrations.AddObanJobsTable do
use Ecto.Migration

def up do
Oban.Migration.up(version: 12)
end

# We specify `version: 1` in `down`, ensuring that we'll roll all the way back down if
# necessary, regardless of which version we've migrated `up` to.
def down do
Oban.Migration.down(version: 1)
end
end

0 comments on commit 51057c3

Please sign in to comment.