Skip to content

Commit

Permalink
Merge pull request #179 from pixlise/bug/quant
Browse files Browse the repository at this point in the history
Bug/quant
  • Loading branch information
pnemere authored Mar 14, 2024
2 parents 2b912d5 + a62c88e commit 6b515f1
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 176 deletions.
10 changes: 3 additions & 7 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,13 @@ jobs:
cd _out
rm pixlise-api-linux
wget https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem
mkdir jobupdater datasourceupdater integrationtest dataimport
mkdir dataimport integrationtest
zip -q dataimport/dataimport-linux-${{ needs.version.outputs.version }}.zip rds-combined-ca-bundle.pem bootstrap
rm bootstrap
# mkdir test-data
# cp ../internal/cmdline-tools/import-integration-test/test-data/000000001-21-10-2022-15-37-00.zip ./test-data
# zip -q jobupdater/jobupdater-linux-${{ needs.version.outputs.version }}.zip rds-combined-ca-bundle.pem jobupdater-linux
# rm jobupdater-linux
# zip -q datasourceupdater/datasourceupdater-linux-${{ needs.version.outputs.version }}.zip rds-combined-ca-bundle.pem datasourceupdater-linux
# rm datasourceupdater-linux
# zip -q integrationtest-linux-${{ needs.version.outputs.version }}.zip rds-combined-ca-bundle.pem integrationtest-linux
# rm integrationtest-linux
zip -q dataimport/dataimport-linux-${{ needs.version.outputs.version }}.zip rds-combined-ca-bundle.pem dataimport-linux
rm dataimport-linux
# zip -q importtest/importtest-linux-${{ needs.version.outputs.version }}.zip importtest-linux test-data/000000001-21-10-2022-15-37-00.zip
# rm importtest-linux
aws s3 cp . s3://corestack-buildartifactsf774a77d-105on4pno9pjm/ --recursive --region us-east-1
Expand Down
6 changes: 2 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ build-linux:
echo "sha: ${GITHUB_SHA}"
GOOS=linux GOARCH=amd64 go run ./data-formats/codegen/main.go -protoPath ./data-formats/api-messages/ -goOutPath ./api/ws/
GOOS=linux GOARCH=amd64 go build -ldflags "-X 'github.com/pixlise/core/v4/api/services.ApiVersion=${BUILD_VERSION}' -X 'github.com/pixlise/core/v4/api/services.GitHash=${GITHUB_SHA}'" -v -o ./_out/pixlise-api-linux ./internal/api
# GOOS=linux GOARCH=amd64 go build -v -o ./_out/jobupdater-linux ./internal/lambdas/quant-job-updater
# GOOS=linux GOARCH=amd64 go build -v -o ./_out/datasourceupdater-linux ./internal/lambdas/dataset-tile-updater
# GOOS=linux GOARCH=amd64 go build -v -o ./_out/integrationtest-linux ./internal/cmdline-tools/api-integration-test
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -v -o ./_out/dataimport-linux ./internal/lambdas/data-import
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -v -o ./_out/bootstrap ./internal/lambdas/data-import
# GOOS=linux GOARCH=amd64 go build -v -o ./_out/importtest-linux ./internal/cmdline-tools/import-integration-test
# GOOS=linux GOARCH=amd64 go build -v -o ./_out/integrationtest-linux ./internal/cmdline-tools/api-integration-test

# build-mac:
# GOPRIVATE=github.com/pixlise GOOS=darwin GOARCH=amd64 go build -ldflags "-X services.ApiVersion=${BUILD_VERSION} -X services.GitHash=${GITHUB_SHA}" -v -o ./_out/pixlise-api-mac ./internal/api
Expand Down
143 changes: 8 additions & 135 deletions api/quantification/quantRunner/runnerKubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/pixlise/core/v4/api/config"
Expand Down Expand Up @@ -95,74 +94,6 @@ func (r *kubernetesRunner) RunPiquant(piquantDockerImage string, params PiquantP
}
}

func getPodObject(paramsStr string, params PiquantParams, dockerImage string, jobid, namespace string, requestorUserId string, length int) *apiv1.Pod {
sec := apiv1.LocalObjectReference{Name: "api-auth"}
application := "piquant-runner"
parts := strings.Split(params.PMCListName, ".")
node := parts[0]
name := fmt.Sprintf("piquant-%s", params.Command)
instance := fmt.Sprintf("%s-%s", name, node)
// Set the serviceaccount for the piquant pods based on namespace
// Piquant Fit commands will run in the same namespace and share a service account
// Piquant Map commands (jobs) will run in the piquant-map namespace with a more limited service account
san := "pixlise-api"
cpu := "250m"
if params.Command == "map" {
san = "piquant-map"
// PiQuant Map Commands will need much more CPU (and can safely request it since they are running on Fargate nodes)
cpu = "3500m"
}

// Kubernetes doesn't like | in owner name, so we swap it for a _ here
safeUserId := strings.ReplaceAll(requestorUserId, "|", "_")

return &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: jobid + "-" + parts[0],
Namespace: namespace,
Labels: map[string]string{
"pixlise.org/application": application,
"pixlise.org/environment": params.RunTimeEnv,
"app.kubernetes.io/name": name,
"app.kubernetes.io/instance": instance,
"app.kubernetes.io/component": application,
"piquant/command": params.Command,
"app": node,
"owner": safeUserId,
"jobid": jobid,
"numberofpods": strconv.Itoa(length),
},
},
Spec: apiv1.PodSpec{
ImagePullSecrets: []apiv1.LocalObjectReference{sec},
RestartPolicy: apiv1.RestartPolicyNever,
ServiceAccountName: san,
Containers: []apiv1.Container{
{
Name: parts[0],
Image: dockerImage,
ImagePullPolicy: apiv1.PullAlways,
Resources: apiv1.ResourceRequirements{
Requests: apiv1.ResourceList{
// The request determines how much cpu is reserved on the Node and will affect scheduling
"cpu": resource.MustParse(cpu),
},
Limits: apiv1.ResourceList{
// Allow the pod to use up to 3500m cpu if it's available on the node
"cpu": resource.MustParse("3500m"),
},
},

Env: []apiv1.EnvVar{
{Name: "QUANT_PARAMS", Value: paramsStr},
{Name: "PYTHONUNBUFFERED", Value: "TRUE"},
},
},
},
},
}
}

// getJobObject generates a Kubernetes Job Manifest for running piquant.
// It takes in the following parameters:
// - params: a PiquantParams struct containing all parameters needed by piquant
Expand Down Expand Up @@ -244,6 +175,11 @@ func getJobObject(params PiquantParams, paramsStr, dockerImage, jobId, namespace
Env: []apiv1.EnvVar{
{Name: "QUANT_PARAMS", Value: paramsStr},
{Name: "PYTHONUNBUFFERED", Value: "TRUE"},
/*{Name: "NODE_INDEX", ValueFrom: &apiv1.EnvVarSource{
FieldRef: &apiv1.ObjectFieldSelector{
FieldPath: "metadata.annotations['batch.kubernetes.io/job-completion-index']",
},
}},*/
},
},
},
Expand All @@ -266,14 +202,14 @@ func (r *kubernetesRunner) runQuantJob(params PiquantParams, jobId, namespace, d
defer close(status)
paramsJSON, err := json.Marshal(params)
if err != nil {
r.kubeHelper.Log.Errorf("Failed to serialise JSON params for node: %v", params.PMCListName)
r.kubeHelper.Log.Errorf("Failed to serialise JSON params for node: %v", jobId)
return
}
paramsStr := string(paramsJSON)
jobSpec := getJobObject(params, paramsStr, dockerImage, jobId, namespace, requestorUserId, count)
job, err := r.kubeHelper.Clientset.BatchV1().Jobs(jobSpec.Namespace).Create(context.Background(), jobSpec, metav1.CreateOptions{})
if err != nil {
r.kubeHelper.Log.Errorf("Job create failed for: %v. namespace: %v, count: %v", params.PMCListName, namespace, count)
r.kubeHelper.Log.Errorf("Job create failed for: %v. namespace: %v, count: %v", jobId, namespace, count)
r.fatalErrors <- err
return
}
Expand All @@ -283,7 +219,7 @@ func (r *kubernetesRunner) runQuantJob(params PiquantParams, jobId, namespace, d
time.Sleep(5 * time.Second)
jobStatus, err := r.getJobStatus(job.Namespace, job.Name)
if err != nil {
r.kubeHelper.Log.Errorf("Failed to get job status for: %v. namespace: %v, count: %v", params.PMCListName, namespace, count)
r.kubeHelper.Log.Errorf("Failed to get job status for: %v. namespace: %v, count: %v", jobId, namespace, count)
r.fatalErrors <- err
return
}
Expand All @@ -294,66 +230,3 @@ func (r *kubernetesRunner) runQuantJob(params PiquantParams, jobId, namespace, d
}
}
}

func (r *kubernetesRunner) runQuantPod(wg *sync.WaitGroup, params PiquantParams, jobid string, namespace string, dockerImage string, requestorUserId string, count int) {
defer wg.Done()

// Make a JSON string out of params so it can be passed in
paramsJSON, err := json.Marshal(params)
if err != nil {
r.kubeHelper.Log.Errorf("Failed to serialise JSON params for node: %v", params.PMCListName)
return
}
paramsStr := string(paramsJSON)

//log.Debugf("getPodObject for: %v. namespace: %v, count: %v", params.PMCListName, namespace, count)
pod := getPodObject(paramsStr, params, dockerImage, jobid, namespace, requestorUserId, count)

co := metav1.CreateOptions{}
pod, err = r.kubeHelper.Clientset.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, co)
if err != nil {
r.kubeHelper.Log.Errorf("Pod create failed for: %v. namespace: %v, count: %v", params.PMCListName, namespace, count)
r.fatalErrors <- err
return
}

// Create Deployment
r.kubeHelper.Log.Infof("Creating pod for %v in namespace %v...", params.PMCListName, namespace)

// Now wait for it to finish
startUnix := time.Now().Unix()
maxEndUnix := startUnix + config.KubernetesMaxTimeoutSec

lastPhase := ""

for currUnix := time.Now().Unix(); currUnix < maxEndUnix; currUnix = time.Now().Unix() {
// Check kubernetes pod status
pod, _ := r.kubeHelper.Clientset.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})

// TODO: is this needed, now that we log?
//fmt.Println(pod.Status.Phase)
//log.Infof("%v phase: %v, pod name: %v, namespace: %v", params.PMCListName, pod.Status.Phase, pod.Name, pod.Namespace)

phase := string(pod.Status.Phase)
if lastPhase != phase {
r.kubeHelper.Log.Infof("%v phase: %v, pod name: %v, namespace: %v", params.PMCListName, pod.Status.Phase, pod.Name, pod.Namespace)
lastPhase = phase
}

if pod.Status.Phase != apiv1.PodRunning && pod.Status.Phase != apiv1.PodPending {
r.kubeHelper.Log.Infof("Deleting pod: %v from namespace: %v", pod.Name, pod.Namespace)

deletePolicy := metav1.DeletePropagationForeground
do := &metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
}
err := r.kubeHelper.Clientset.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, *do)
if err != nil {
r.kubeHelper.Log.Errorf("Failed to remove pod: %v, namespace: %v\n", pod.Name, pod.Namespace)
}
break
}

time.Sleep(5 * time.Second)
}
}
2 changes: 1 addition & 1 deletion data-formats
67 changes: 38 additions & 29 deletions generated-protos/screen-configuration.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 6b515f1

Please sign in to comment.