Skip to content

Commit

Permalink
Kafka container and demo pipeline fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ardem committed Oct 22, 2024
1 parent cedc31f commit 2c495d4
Show file tree
Hide file tree
Showing 10 changed files with 584 additions and 33 deletions.
9 changes: 5 additions & 4 deletions .env.common
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ YLEM_REDIS_HOST=ylem_session_storage
YLEM_REDIS_PORT=6379
YLEM_REDIS_PASSWORD=dtmnpassword

# Ylem requires your own Apache Kafka to be installed to be able to process pipelines and tasks.
# Add the URL of your Apache Kafka instance here or keep it as host.docker.internal:9092 if you have a standard
# Kafka set up on your host machine.
YLEM_KAFKA_BOOTSTRAP_SERVERS=host.docker.internal:9092
# Ylem uses Apache Kafka to exchange messages for processing pipelines and tasks.
# If you use our standard container defined in docker-compose.yml keep this value as ylem_kafka_broker:39092
# If you have your own standard Apache Kafka on the host machine keep it as host.docker.internal:9092
# Or modify it accordingly if you have a custom setup.
YLEM_KAFKA_BOOTSTRAP_SERVERS=ylem_kafka_broker:39092

YLEM_KAFKA_TASK_RUNS_LOAD_BALANCED_TOPIC=task_runs_load_balanced
YLEM_KAFKA_TASK_RUNS_TOPIC=task_runs
Expand Down
7 changes: 4 additions & 3 deletions backend/integrations/api/CreateSQLIntegration.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func createElasticsearchSQLIntegration(db *sql.DB, user *services.Authentication
return nil
}

err = repositories.CreateSQLIntegration(db, entity)
_, err = repositories.CreateSQLIntegration(db, entity)

if err != nil {
log.Error(err)
Expand Down Expand Up @@ -320,7 +320,7 @@ func createGenericSQLIntegration(db *sql.DB, user *services.AuthenticationData,
return nil
}

err = repositories.CreateSQLIntegration(db, entity)
ent, err := repositories.CreateSQLIntegration(db, entity)
if err != nil {
log.Error(err)
helpers.HttpReturnErrorInternal(w)
Expand All @@ -339,6 +339,7 @@ func createGenericSQLIntegration(db *sql.DB, user *services.AuthenticationData,
return nil
}

entity.Integration.Uuid = ent.Integration.Uuid
w.WriteHeader(http.StatusOK)
jsonResponse, _ := json.Marshal(entity)
_, err = w.Write(jsonResponse)
Expand Down Expand Up @@ -432,7 +433,7 @@ func createBigQuerySQLIntegration(db *sql.DB, user *services.AuthenticationData,
return nil
}

err = repositories.CreateSQLIntegration(db, entity)
_, err = repositories.CreateSQLIntegration(db, entity)

if err != nil {
log.Error(err)
Expand Down
16 changes: 8 additions & 8 deletions backend/integrations/repositories/SQLRepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
log "github.com/sirupsen/logrus"
)

func CreateSQLIntegration(db *sql.DB, entity *entities.SQLIntegration) error {
func CreateSQLIntegration(db *sql.DB, entity *entities.SQLIntegration) (*entities.SQLIntegration, error) {
log.Tracef("Creating SQL Integration")

tx, err := db.Begin()
defer tx.Rollback() //nolint:all
if err != nil {
log.Error(err)

return err
return nil, err
}

entity.Integration.Uuid = uuid.NewString()
Expand All @@ -40,7 +40,7 @@ func CreateSQLIntegration(db *sql.DB, entity *entities.SQLIntegration) error {
_ = tx.Rollback()
log.Error(err)

return err
return nil, err
}
defer stmt.Close()

Expand All @@ -59,7 +59,7 @@ func CreateSQLIntegration(db *sql.DB, entity *entities.SQLIntegration) error {
if err != nil {
_ = tx.Rollback()
log.Error(err)
return err
return nil, err
}

entity.Integration.Id, _ = result.LastInsertId()
Expand All @@ -73,7 +73,7 @@ func CreateSQLIntegration(db *sql.DB, entity *entities.SQLIntegration) error {
_ = tx.Rollback()
log.Error(err)

return err
return nil, err
}
defer stmt.Close()

Expand Down Expand Up @@ -105,17 +105,17 @@ func CreateSQLIntegration(db *sql.DB, entity *entities.SQLIntegration) error {
if err != nil {
_ = tx.Rollback()
log.Error(err)
return err
return nil, err
}
}

err = tx.Commit()
if err != nil {
log.Error(err)
return err
return nil, err
}

return nil
return entity, nil
}

func FindSQLIntegration(db *sql.DB, Uuid string) (*entities.SQLIntegration, error) {
Expand Down
31 changes: 31 additions & 0 deletions backend/pipelines/app/trial/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,37 @@ func createTrialPipeline(db *sql.DB, pipelineUuid string, pipelineName string, o
return err
}


tx, err = db.Begin()
if err != nil {
log.Error(err)
return err
}

tplWf, tasks, tts, err = loadPipelineData(db, newWfUuid)

Check failure on line 94 in backend/pipelines/app/trial/creator.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to tplWf (ineffassign)
if err != nil {
_ = tx.Rollback()
log.Error(err)
return err
}

var query *task.Query
var uQuery *task.HttpApiUpdatedQuery
for _, qTask := range tasks.Items {
if qTask.Type == task.TaskTypeQuery {
query, _ = task.GetQuery(db, qTask.Id)
uQuery.SourceUuid = sourceUuid
uQuery.SQLQuery = query.SQLQuery
_ = task.UpdateQueryTx(tx, qTask.Id, uQuery)
}
}

err = tx.Commit()
if err != nil {
log.Error(err)
return err
}

return nil
}

Expand Down
12 changes: 6 additions & 6 deletions backend/pipelines/db/migration/000002_pipeline_templates.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ INSERT INTO `aggregators` (`id`, `uuid`, `expression`, `variable_name`, `is_acti
VALUES
(1, '9f37a684-1aa9-46ca-8817-0119106b0dd4', X'53554D28616D6F756E7429', 'value', 1),
(2, 'bd76e868-daf0-475d-b8c9-684344116630', X'53554D28616D6F756E7429', 'value', 1),
(3, '23dc5b6c-c7c3-4161-8a0a-e51e25e41373', X'524F554E4428415647286E756D6265725F66616D696C696573292C20322C2022666C6F6F722229', 'value', 1),
(3, '23dc5b6c-c7c3-4161-8a0a-e51e25e41373', X'524F554E442841564728616D6F756E74292C20322C2022666C6F6F722229', 'value', 1),
(4, '859e5c14-9d22-42ce-8d57-5f24ae9707ca', X'41564728616D6F756E7429', 'value', 1),
(5, 'd6f110b4-9574-4658-ae4e-5bc14eac9665', X'4D494E28616D6F756E7429', 'value', 1),
(6, '15922fe2-6404-4ae1-a1f4-57c5d0f72637', X'4D415828616D6F756E7429', 'value', 1),
Expand Down Expand Up @@ -588,7 +588,7 @@ VALUES

INSERT INTO `codes` (`id`, `uuid`, `type`, `code`, `is_active`)
VALUES
(1, '05f11c00-ffdd-4a5f-846d-b9f1d897161a', '', X'726573756C74203D205B5D0A0A666F72206920696E20696E7075743A0A2020696620695B226E756D6265725F66616D696C696573225D206973206E6F74204E6F6E6520616E6420695B226E756D6265725F66616D696C696573225D203C20323030303A0A20202020726573756C742E617070656E642869290A0A696E707574203D20726573756C74', 1);
(1, '05f11c00-ffdd-4a5f-846d-b9f1d897161a', '', X'726573756C74203D205B5D0A0A666F72206920696E20696E7075743A0A2020696620695B22616D6F756E74225D206973206E6F74204E6F6E6520616E6420695B22616D6F756E74225D203C20323030303A0A20202020726573756C742E617070656E642869290A0A696E707574203D20726573756C74', 1);


INSERT INTO `conditions` (`id`, `uuid`, `expression`, `is_active`)
Expand Down Expand Up @@ -760,7 +760,7 @@ VALUES
(9, 'be3b9142-24f5-4243-beb2-e8b15cabe487', '', X'53454C454354202A0A46524F4D20747269616C5F64617461626173652E637573746F6D6572733B', 1),
(10, '964dc92d-9f96-4009-9aa8-44ea10164418', '', X'53454C454354202A0A46524F4D20747269616C5F64617461626173652E7061796D656E74733B', 1),
(11, '05f6ca8b-2b8c-4abb-ba4b-7512473751a1', '', X'53454C454354202A0A46524F4D20747269616C5F64617461626173652E6F72646572733B', 1),
(12, '563e4981-87a6-44d2-ab3c-e434fc7d28c1', '', X'53454C454354202A0A46524F4D205266616D2E76657273696F6E0A4C494D495420310A3B', 1),
(12, '563e4981-87a6-44d2-ab3c-e434fc7d28c1', '', X'53454C454354202A0A46524F4D2066696E616E63652E696E766F696365730A3B', 1),
(13, '4b2880af-cded-43b8-9307-fbe6c56968f6', '', X'53454C454354202A0A46524F4D20747269616C5F64617461626173652E6F72646572733B', 1),
(14, 'a2829443-f6f5-4e78-b71f-49d0b3ee9e90', '', X'53454C454354202A0A46524F4D20747269616C5F64617461626173652E6F72646572733B', 1),
(15, '13e72f79-dc85-4185-97db-d6e756cb908a', '', X'53454C454354202A0A46524F4D20747269616C5F64617461626173652E6F72646572733B', 1),
Expand Down Expand Up @@ -794,13 +794,13 @@ VALUES
(43, 'ed1c305e-6be9-47b4-8798-fc19a21e6dbb', '', X'', 1),
(44, '364e9d81-54e1-4e8d-bfd6-0d0975a19495', '', X'', 1),
(45, '2e4a07ad-d3a7-414a-8bd9-0a0b596aaadf', '', X'53454C454354202A0A46524F4D205266616D2E736565645F726567696F6E0A4C494D495420310A3B', 1),
(46, '6e4a30fa-b4b8-4f9a-b9c8-3deb8ddad64a', '', X'53454C454354202A0A46524F4D20637573746F6D65725F737563636573732E6F7267616E697A6174696F6E730A3B', 1),
(47, 'eb1dc26b-190f-4611-a366-904834726b33', '', X'53454C454354202A0A46524F4D205266616D2E76657273696F6E0A4C494D495420310A3B', 1),
(46, '6e4a30fa-b4b8-4f9a-b9c8-3deb8ddad64a', '', X'53454C454354202A0A46524F4D2066696E616E63652E696E766F696365730A3B', 1),
(47, 'eb1dc26b-190f-4611-a366-904834726b33', '', X'53454C454354202A0A46524F4D2066696E616E63652E696E766F696365730A3B', 1),
(48, '1050a550-a358-4cf1-8448-5695f90a1b03', '', X'', 1),
(49, 'f8255205-d651-4a17-b5e4-b05a5ce51f89', '', X'', 1),
(50, '58023f9a-fed3-4a17-9c7d-150c874cd9e8', '', X'', 1),
(51, '0b4bd16a-fd3a-4013-bae7-42b3eb2ccf3c', '', X'53454C454354202A0A46524F4D2074656D706C617465312E6E756C6C0A3B', 1),
(52, '96b43e21-f1b4-458a-a1d9-04186d1cc536', '', X'53454C454354202A0A46524F4D205266616D2E76657273696F6E0A4C494D495420310A3B', 1),
(52, '96b43e21-f1b4-458a-a1d9-04186d1cc536', '', X'53454C454354202A0A46524F4D2066696E616E63652E696E766F696365730A3B', 1),
(53, 'f160e3f6-0378-43c0-a2e4-560218fb3007', '', X'53454C454354202A0A46524F4D2068722E68697265730A574845524520594541522868697265645F617429203D20594541522843555252454E545F4441544528292920414E44204D4F4E54482868697265645F617429203D204D4F4E54482843555252454E545F444154452829290A3B', 1),
(54, 'f773c4f2-c4f4-43e5-8e9e-d1a5d621dbba', '', X'53454C454354202A0A46524F4D2065636F6D6D657263652E6F72646572730A5748455245205945415228637265617465645F617429203D20594541522843555252454E545F4441544528292920414E44204D4F4E544828637265617465645F617429203D204D4F4E54482843555252454E545F444154452829290A3B', 1),
(55, '0027a56a-03a2-4cb2-bd7f-bd9a33a14a58', '', X'53454C454354202A0A46524F4D2065636F6D6D657263652E6F72646572730A574845524520637265617465645F6174203E20444154455F535542284E4F5728292C20494E54455256414C20333020444159290A2020414E4420737461747573203D2022636F6D706C65746564220A3B', 1),
Expand Down
4 changes: 2 additions & 2 deletions backend/statistics/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ services:
container_name: ylem_statistics_database
healthcheck:
interval: 10s
retries: 5
retries: 50
test: wget --no-verbose --tries=1 --spider http://127.0.0.1:8123/ping || exit 1
timeout: 10s
timeout: 15s

ylem_statistics_migrations:
env_file:
Expand Down
12 changes: 6 additions & 6 deletions backend/users/services/CreateTrialDBDataSource.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ type dataSource struct {
func newTrialDataSource(organizationUuid string) *dataSource {
d := dataSource{OrganizationUuid: organizationUuid}
d.Type = "mysql"
d.Name = "Public Rfam Database for testing"
d.Host = "mysql-rfam-public.ebi.ac.uk"
d.Port = 4497
d.User = "rfamro"
d.Password = ""
d.Database = "Rfam"
d.Name = "Ylem test"
d.Host = "ylem_database"
d.Port = 3306
d.User = "dtmntestuser"
d.Password = "dtmntestpassword"
d.Database = ""
d.ConnectionType = "direct"

return &d
Expand Down
Loading

0 comments on commit 2c495d4

Please sign in to comment.