From c77f91fa93a382d62706fd5458b5d6a92289d768 Mon Sep 17 00:00:00 2001 From: Derek Su Date: Wed, 4 Dec 2024 12:28:54 +0800 Subject: [PATCH 1/3] chore(vendor): update dependencies Longhorn 9104 Signed-off-by: Derek Su --- go.mod | 2 ++ go.sum | 4 ++-- .../longhorn/types/pkg/generated/spdkrpc/spdk.pb.go | 13 ++++++++++++- vendor/modules.txt | 2 +- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 4166e234..ae971297 100644 --- a/go.mod +++ b/go.mod @@ -59,3 +59,5 @@ require ( k8s.io/mount-utils v0.31.3 // indirect k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect ) + +replace github.com/longhorn/types v0.0.0-20241123075624-48c550af4eab => github.com/derekbit/longhorn-types v0.0.0-20241128141620-e298a5a080b7 diff --git a/go.sum b/go.sum index 80bab85a..60cd0f52 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/derekbit/longhorn-types v0.0.0-20241128141620-e298a5a080b7 h1:iuOCk/GzWJKVCyVOzp8bfErGusrHuaSHJxFjyID07PM= +github.com/derekbit/longhorn-types v0.0.0-20241128141620-e298a5a080b7/go.mod h1:dIPa2yMBPOa514bn3pohT7kbVAQcZbZSWItpgxunuPs= github.com/gammazero/deque v0.2.0 h1:SkieyNB4bg2/uZZLxvya0Pq6diUlwx7m2TeT7GAIWaA= github.com/gammazero/deque v0.2.0/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q= @@ -50,8 +52,6 @@ github.com/longhorn/go-common-libs v0.0.0-20241128023039-4d6c3a880dbc h1:Ok7qdNu github.com/longhorn/go-common-libs v0.0.0-20241128023039-4d6c3a880dbc/go.mod h1:gSa+qB058kcNlCaOOwIFPHb3tvqMTmKcxtL7HPTS4o4= github.com/longhorn/go-spdk-helper v0.0.0-20241202131855-7d9a097456b2 h1:r55x3FfMPn5mj2aXLvJT6ijfa9BkWQSL+qNZK1CKsb4= github.com/longhorn/go-spdk-helper v0.0.0-20241202131855-7d9a097456b2/go.mod h1:siQvee7KIyFESzr5iQUVQavHCcSlzw2AjbdjchpDm4o= -github.com/longhorn/types v0.0.0-20241123075624-48c550af4eab h1:vW/sSHB0U/GPnornD1cdoKSEe9bdBwVKP68cZUs1Xh0= -github.com/longhorn/types v0.0.0-20241123075624-48c550af4eab/go.mod h1:dIPa2yMBPOa514bn3pohT7kbVAQcZbZSWItpgxunuPs= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= diff --git a/vendor/github.com/longhorn/types/pkg/generated/spdkrpc/spdk.pb.go b/vendor/github.com/longhorn/types/pkg/generated/spdkrpc/spdk.pb.go index 871232c4..338df5cd 100644 --- a/vendor/github.com/longhorn/types/pkg/generated/spdkrpc/spdk.pb.go +++ b/vendor/github.com/longhorn/types/pkg/generated/spdkrpc/spdk.pb.go @@ -1541,6 +1541,7 @@ type Engine struct { ErrorMsg string `protobuf:"bytes,14,opt,name=error_msg,json=errorMsg,proto3" json:"error_msg,omitempty"` TargetIp string `protobuf:"bytes,15,opt,name=target_ip,json=targetIp,proto3" json:"target_ip,omitempty"` TargetPort int32 `protobuf:"varint,16,opt,name=target_port,json=targetPort,proto3" json:"target_port,omitempty"` + StandbyTargetPort int32 `protobuf:"varint,17,opt,name=standby_target_port,json=standbyTargetPort,proto3" json:"standby_target_port,omitempty"` } func (x *Engine) Reset() { @@ -1687,6 +1688,13 @@ func (x *Engine) GetTargetPort() int32 { return 0 } +func (x *Engine) GetStandbyTargetPort() int32 { + if x != nil { + return x.StandbyTargetPort + } + return 0 +} + type EngineCreateRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -4435,7 +4443,7 @@ var file_spdkrpc_spdk_proto_rawDesc = []byte{ 0x75, 0x69, 0x6c, 0x64, 0x69, 0x6e, 0x67, 0x5f, 0x6c, 0x76, 0x6f, 0x6c, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x18, 0x64, 0x73, 0x74, 0x52, 0x65, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x69, 0x6e, 0x67, 0x4c, 0x76, 0x6f, 0x6c, 0x41, 0x64, 0x64, - 0x72, 0x65, 0x73, 0x73, 0x22, 0xbc, 0x06, 0x0a, 0x06, 0x45, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x12, + 0x72, 0x65, 0x73, 0x73, 0x22, 0xec, 0x06, 0x0a, 0x06, 0x45, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x76, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x76, 0x6f, 0x6c, 0x75, 0x6d, 0x65, @@ -4472,6 +4480,9 @@ var file_spdkrpc_spdk_proto_rawDesc = []byte{ 0x67, 0x65, 0x74, 0x5f, 0x69, 0x70, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x49, 0x70, 0x12, 0x1f, 0x0a, 0x0b, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x10, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x74, 0x61, 0x72, + 0x67, 0x65, 0x74, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x2e, 0x0a, 0x13, 0x73, 0x74, 0x61, 0x6e, 0x64, + 0x62, 0x79, 0x5f, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x11, + 0x20, 0x01, 0x28, 0x05, 0x52, 0x11, 0x73, 0x74, 0x61, 0x6e, 0x64, 0x62, 0x79, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x50, 0x6f, 0x72, 0x74, 0x1a, 0x44, 0x0a, 0x16, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, diff --git a/vendor/modules.txt b/vendor/modules.txt index c75df122..a44260b3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -73,7 +73,7 @@ github.com/longhorn/go-spdk-helper/pkg/spdk/setup github.com/longhorn/go-spdk-helper/pkg/spdk/types github.com/longhorn/go-spdk-helper/pkg/types github.com/longhorn/go-spdk-helper/pkg/util -# github.com/longhorn/types v0.0.0-20241123075624-48c550af4eab +# github.com/longhorn/types v0.0.0-20241123075624-48c550af4eab => github.com/derekbit/longhorn-types v0.0.0-20241128141620-e298a5a080b7 ## explicit; go 1.22.7 github.com/longhorn/types/pkg/generated/spdkrpc # github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 From e23d8b21a1f069410689b33c0f9ca08af5c75d29 Mon Sep 17 00:00:00 2001 From: Derek Su Date: Tue, 19 Nov 2024 13:45:41 +0800 Subject: [PATCH 2/3] feat(v2 upgrade): support engine live upgrade Longhorn 9104 Signed-off-by: Derek Su --- pkg/api/types.go | 2 + pkg/client/client.go | 3 +- pkg/spdk/engine.go | 438 ++++++++++++++++++++++++---------------- pkg/spdk/engine_test.go | 233 +++++++++++++++++++++ pkg/spdk/server.go | 2 +- pkg/spdk_test.go | 86 +------- 6 files changed, 507 insertions(+), 257 deletions(-) create mode 100644 pkg/spdk/engine_test.go diff --git a/pkg/api/types.go b/pkg/api/types.go index cb30a325..1f600307 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -129,6 +129,7 @@ type Engine struct { Port int32 `json:"port"` TargetIP string `json:"target_ip"` TargetPort int32 `json:"target_port"` + StandbyTargetPort int32 `json:"standby_target_port"` ReplicaAddressMap map[string]string `json:"replica_address_map"` ReplicaModeMap map[string]types.Mode `json:"replica_mode_map"` Head *Lvol `json:"head"` @@ -149,6 +150,7 @@ func ProtoEngineToEngine(e *spdkrpc.Engine) *Engine { Port: e.Port, TargetIP: e.TargetIp, TargetPort: e.TargetPort, + StandbyTargetPort: e.StandbyTargetPort, ReplicaAddressMap: e.ReplicaAddressMap, ReplicaModeMap: map[string]types.Mode{}, Head: ProtoLvolToLvol(e.Head), diff --git a/pkg/client/client.go b/pkg/client/client.go index 110069f8..8662fad1 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -474,7 +474,7 @@ func (c *SPDKClient) ReplicaRebuildingDstSnapshotRevert(name, snapshotName strin } func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize uint64, replicaAddressMap map[string]string, portCount int32, - initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (*api.Engine, error) { + initiatorAddress, targetAddress string, salvageRequested bool) (*api.Engine, error) { if name == "" || volumeName == "" || len(replicaAddressMap) == 0 { return nil, fmt.Errorf("failed to start SPDK engine: missing required parameters") } @@ -490,7 +490,6 @@ func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize ui ReplicaAddressMap: replicaAddressMap, Frontend: frontend, PortCount: portCount, - UpgradeRequired: upgradeRequired, TargetAddress: targetAddress, InitiatorAddress: initiatorAddress, SalvageRequested: salvageRequested, diff --git a/pkg/spdk/engine.go b/pkg/spdk/engine.go index 5a11e76d..42c79726 100644 --- a/pkg/spdk/engine.go +++ b/pkg/spdk/engine.go @@ -36,18 +36,19 @@ import ( type Engine struct { sync.RWMutex - Name string - VolumeName string - SpecSize uint64 - ActualSize uint64 - IP string - Port int32 - TargetIP string - TargetPort int32 - Frontend string - Endpoint string - Nqn string - Nguid string + Name string + VolumeName string + SpecSize uint64 + ActualSize uint64 + IP string + Port int32 // Port that initiator is connecting to + TargetIP string + TargetPort int32 // Port of the target that is used for letting initiator connect to + StandbyTargetPort int32 + Frontend string + Endpoint string + Nqn string + Nguid string ctrlrLossTimeout int fastIOFailTimeoutSec int @@ -113,10 +114,45 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU } } -func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (ret *spdkrpc.Engine, err error) { +func (e *Engine) isNewEngine() bool { + return e.IP == "" && e.TargetIP == "" && e.StandbyTargetPort == 0 +} + +func (e *Engine) checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, targetIP string) (bool, bool, error) { + initiatorCreationRequired, targetCreationRequired := false, false + var err error + + if podIP == initiatorIP && podIP == targetIP { + if e.Port == 0 && e.TargetPort == 0 { + e.log.Info("Creating both initiator and target instances") + initiatorCreationRequired = true + targetCreationRequired = true + } else if e.Port != 0 && e.TargetPort == 0 { + e.log.Info("Creating a target instance") + if e.StandbyTargetPort != 0 { + e.log.Warnf("Standby target instance with port %v is already created, will skip the target creation", e.StandbyTargetPort) + } else { + targetCreationRequired = true + } + } else { + e.log.Infof("Initiator instance with port %v and target instance with port %v are already created, will skip the creation", e.Port, e.TargetPort) + } + } else if podIP == initiatorIP { + e.log.Info("Creating an initiator instance") + initiatorCreationRequired = true + } else if podIP == targetIP { + e.log.Info("Creating a target instance") + targetCreationRequired = true + } else { + err = fmt.Errorf("invalid initiator and target addresses for engine %s creation with initiator address %v and target address %v", e.Name, initiatorIP, targetIP) + } + + return initiatorCreationRequired, targetCreationRequired, err +} + +func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, salvageRequested bool) (ret *spdkrpc.Engine, err error) { logrus.WithFields(logrus.Fields{ "portCount": portCount, - "upgradeRequired": upgradeRequired, "replicaAddressMap": replicaAddressMap, "initiatorAddress": initiatorAddress, "targetAddress": targetAddress, @@ -128,7 +164,6 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.Lock() defer func() { e.Unlock() - if requireUpdate { e.UpdateCh <- nil } @@ -143,6 +178,7 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str if err != nil { return nil, errors.Wrapf(err, "failed to split initiator address %v", initiatorAddress) } + targetIP, _, err := splitHostPort(targetAddress) if err != nil { return nil, errors.Wrapf(err, "failed to split target address %v", targetAddress) @@ -177,29 +213,30 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str replicaBdevList := []string{} - initiatorCreationRequired := true - if !upgradeRequired { - if e.IP == "" { - if initiatorIP != targetIP { - // For creating target on another node - initiatorCreationRequired = false - e.log.Info("Creating an target engine") - e.TargetIP = podIP - } else { - // For newly creating engine - e.log.Info("Creating an new engine") - e.IP = podIP - e.TargetIP = podIP - } + initiatorCreationRequired, targetCreationRequired, err := e.checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, targetIP) + if err != nil { + return nil, err + } + if !initiatorCreationRequired && !targetCreationRequired { + return e.getWithoutLock(), nil + } - e.log = e.log.WithField("ip", e.IP) - } else { - if initiatorIP != targetIP { - return nil, errors.Errorf("unsupported operation: engine ip=%v, initiator address=%v, target address=%v", e.IP, initiatorAddress, targetAddress) - } + if e.isNewEngine() { + if initiatorCreationRequired { + e.IP = initiatorIP + } + e.TargetIP = targetIP + } - // For creating target on attached node - initiatorCreationRequired = false + e.log = e.log.WithFields(logrus.Fields{ + "initiatorIP": e.IP, + "targetIP": e.TargetIP, + }) + + if targetCreationRequired { + _, err := spdkClient.BdevRaidGet(e.Name, 0) + if err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return nil, errors.Wrapf(err, "failed to get raid bdev %v during engine creation", e.Name) } if salvageRequested { @@ -231,18 +268,15 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.checkAndUpdateInfoFromReplicaNoLock() - e.log.Infof("Tried to connected all replicas %+v, then launching raid during engine creation", e.ReplicaStatusMap) + // TODO: improve the log message + e.log.Infof("Connecting all available replicas %+v, then launching raid during engine creation", e.ReplicaStatusMap) if _, err := spdkClient.BdevRaidCreate(e.Name, spdktypes.BdevRaidLevel1, 0, replicaBdevList); err != nil { return nil, err } } else { - // For reconstructing engine after switching over target to another node - initiatorCreationRequired = false - - e.IP = targetIP + e.log.Info("Skipping target creation during engine creation") - // Get ReplicaModeMap and ReplicaBdevNameMap - targetSPDKServiceAddress := net.JoinHostPort(e.IP, strconv.Itoa(types.SPDKServicePort)) + targetSPDKServiceAddress := net.JoinHostPort(e.TargetIP, strconv.Itoa(types.SPDKServicePort)) targetSPDKClient, err := GetServiceClient(targetSPDKServiceAddress) if err != nil { return nil, err @@ -253,21 +287,17 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str } }() - var engineWithTarget *api.Engine - if initiatorIP != targetIP { - engineWithTarget, err = targetSPDKClient.EngineGet(e.Name) - if err != nil { - return nil, errors.Wrapf(err, "failed to get engine %v from %v", e.Name, targetAddress) - } - } else { - engineWithTarget = api.ProtoEngineToEngine(e.getWithoutLock()) + e.log.Info("Fetching replica list from target engine") + targetEngine, err := targetSPDKClient.EngineGet(e.Name) + if err != nil { + return nil, errors.Wrapf(err, "failed to get engine %v from %v", e.Name, targetAddress) } for replicaName, replicaAddr := range replicaAddressMap { e.ReplicaStatusMap[replicaName] = &EngineReplicaStatus{ Address: replicaAddr, } - if _, ok := engineWithTarget.ReplicaAddressMap[replicaName]; !ok { + if _, ok := targetEngine.ReplicaAddressMap[replicaName]; !ok { e.log.WithError(err).Warnf("Failed to get bdev from replica %s with address %s during creation, will mark the mode to ERR and continue", replicaName, replicaAddr) e.ReplicaStatusMap[replicaName].Mode = types.ModeERR } else { @@ -276,12 +306,20 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.ReplicaStatusMap[replicaName].BdevName = replicaName } } - e.log = e.log.WithField("replicaStatusMap", e.ReplicaStatusMap) - e.log.Infof("Tried to re-connected all replicas %+v for engine reconstruction during upgrade", e.ReplicaStatusMap) + + e.log = e.log.WithField("replicaAddressMap", replicaAddressMap) + e.log.Infof("Connected all available replicas %+v for engine reconstruction during upgrade", e.ReplicaStatusMap) } - e.log.Info("Launching frontend during engine creation") - if err := e.handleFrontend(spdkClient, portCount, superiorPortAllocator, initiatorCreationRequired, upgradeRequired, initiatorAddress, targetAddress); err != nil { + log := e.log.WithFields(logrus.Fields{ + "initiatorCreationRequired": initiatorCreationRequired, + "targetCreationRequired": targetCreationRequired, + "initiatorAddress": initiatorAddress, + "targetAddress": targetAddress, + }) + + log.Info("Handling frontend during engine creation") + if err := e.handleFrontend(spdkClient, superiorPortAllocator, portCount, targetAddress, initiatorCreationRequired, targetCreationRequired); err != nil { return nil, err } @@ -366,7 +404,8 @@ func (e *Engine) filterSalvageCandidates(replicaAddressMap map[string]string) (m return filteredCandidates, nil } -func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorCreationRequired, upgradeRequired bool, initiatorAddress, targetAddress string) (err error) { +func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap, portCount int32, targetAddress string, + initiatorCreationRequired, targetCreationRequired bool) (err error) { if !types.IsFrontendSupported(e.Frontend) { return fmt.Errorf("unknown frontend type %s", e.Frontend) } @@ -376,9 +415,9 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, return nil } - initiatorIP, _, err := splitHostPort(initiatorAddress) - if err != nil { - return errors.Wrapf(err, "failed to split initiator address %v", initiatorAddress) + standbyTargetCreationRequired := false + if e.Port != 0 && e.TargetPort == 0 { + standbyTargetCreationRequired = true } targetIP, targetPort, err := splitHostPort(targetAddress) @@ -387,34 +426,87 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, } e.Nqn = helpertypes.GetNQN(e.Name) + e.Nguid = commonutils.RandomID(nvmeNguidLength) + + dmDeviceBusy := false + port := int32(0) + initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) + if err != nil { + return errors.Wrapf(err, "failed to create initiator for engine %v", e.Name) + } - var port int32 - if !upgradeRequired { - e.Nguid = commonutils.RandomID(nvmeNguidLength) + defer func() { + if err == nil { + if !standbyTargetCreationRequired { + e.initiator = initiator + e.dmDeviceBusy = dmDeviceBusy + e.Endpoint = initiator.GetEndpoint() + e.log = e.log.WithFields(logrus.Fields{ + "endpoint": e.Endpoint, + "port": e.Port, + "targetPort": e.TargetPort, + }) + } - e.log.Info("Blindly stopping expose bdev for engine") - if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { - return errors.Wrapf(err, "failed to blindly stop expose bdev for engine %v", e.Name) + e.log.Infof("Finished handling frontend for engine: %+v", e) } + }() - port, _, err = superiorPortAllocator.AllocateRange(portCount) - if err != nil { - return err + if initiatorCreationRequired && !targetCreationRequired { + initiator.TransportAddress = targetIP + initiator.TransportServiceID = strconv.Itoa(int(targetPort)) + + e.log.Infof("Target instance already exists on %v, no need to create target instance", targetAddress) + e.Port = targetPort + + // TODO: + // "nvme list -o json" might be empty devices for a while instance manager pod is just started. + // The root cause is not clear, so we need to retry to load NVMe device info. + for r := 0; r < maxNumRetries; r++ { + err = initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN) + if err != nil && strings.Contains(err.Error(), "failed to get devices") { + time.Sleep(retryInterval) + continue + } + if err == nil { + e.log.Infof("Loaded NVMe device info for engine") + break + } + return errors.Wrapf(err, "failed to load NVMe device info for engine %v", e.Name) } - e.log.Infof("Allocated port %v", port) - if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil { - return err + err = initiator.LoadEndpoint(false) + if err != nil { + return errors.Wrapf(err, "failed to load endpoint for engine %v", e.Name) } - if initiatorCreationRequired { - e.Port = port - e.TargetPort = port + return nil + } + + e.log.Info("Blindly stopping expose bdev for engine") + if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { + return errors.Wrapf(err, "failed to blindly stop expose bdev for engine %v", e.Name) + } + + port, _, err = superiorPortAllocator.AllocateRange(portCount) + if err != nil { + return errors.Wrapf(err, "failed to allocate port for engine %v", e.Name) + } + e.log.Infof("Allocated port %v for engine", port) + + if initiatorCreationRequired { + e.Port = port + } + if targetCreationRequired { + if standbyTargetCreationRequired { + e.StandbyTargetPort = port } else { e.TargetPort = port } - } else { - e.Port = targetPort + } + + if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil { + return err } if e.Frontend == types.FrontendSPDKTCPNvmf { @@ -422,50 +514,17 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, return nil } - if initiatorIP != targetIP && !upgradeRequired { - e.log.Infof("Initiator IP %v is different from target IP %s, will not start initiator for engine", initiatorIP, targetIP) + if !initiatorCreationRequired && targetCreationRequired { + e.log.Infof("Only creating target instance for engine, no need to start initiator") return nil } - initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) - if err != nil { - return errors.Wrapf(err, "failed to create initiator for engine %v", e.Name) - } + e.log.Info("Starting initiator for engine") - dmDeviceBusy := false - if initiatorCreationRequired { - e.log.Info("Starting initiator for engine") - dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(port)), true) - if err != nil { - return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) - } - } else { - e.log.Info("Loading NVMe device info for engine") - err = initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN) - if err != nil { - if nvme.IsValidNvmeDeviceNotFound(err) { - dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(targetPort)), true) - if err != nil { - return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) - } - } else { - return errors.Wrapf(err, "failed to load NVMe device info for engine %v", e.Name) - } - } - err = initiator.LoadEndpoint(false) - if err != nil { - return errors.Wrapf(err, "failed to load endpoint for engine %v", e.Name) - } - //dmDeviceBusy = true + dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(port)), true) + if err != nil { + return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) } - e.initiator = initiator - e.dmDeviceBusy = dmDeviceBusy - e.Endpoint = initiator.GetEndpoint() - - e.log = e.log.WithFields(logrus.Fields{ - "endpoint": e.Endpoint, - "port": port, - }) return nil } @@ -573,6 +632,7 @@ func (e *Engine) getWithoutLock() (res *spdkrpc.Engine) { Port: e.Port, TargetIp: e.TargetIP, TargetPort: e.TargetPort, + StandbyTargetPort: e.StandbyTargetPort, Snapshots: map[string]*spdkrpc.Lvol{}, Frontend: e.Frontend, Endpoint: e.Endpoint, @@ -639,16 +699,8 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) { } }() - podIP, err := commonnet.GetIPForPod() - if err != nil { - return err - } - if e.IP != podIP { - // Skip the validation if the engine is being upgraded - if engineOnlyContainsInitiator(e) || engineOnlyContainsTarget(e) { - return nil - } - return fmt.Errorf("found mismatching between engine IP %s and pod IP %s for engine %v", e.IP, podIP, e.Name) + if e.IP != e.TargetIP { + return nil } if err := e.validateAndUpdateFrontend(subsystemMap); err != nil { @@ -2098,37 +2150,53 @@ func (e *Engine) Resume(spdkClient *spdkclient.Client) (err error) { } // SwitchOverTarget function in the Engine struct is responsible for switching the engine's target to a new address. -func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress string) (err error) { - e.log.Infof("Switching over engine to target address %s", targetAddress) +func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, newTargetAddress string) (err error) { + e.log.Infof("Switching over engine to target address %s", newTargetAddress) + + if newTargetAddress == "" { + return fmt.Errorf("invalid empty target address for engine %s target switchover", e.Name) + } currentTargetAddress := "" + podIP, err := commonnet.GetIPForPod() + if err != nil { + return errors.Wrapf(err, "failed to get IP for pod for engine %s target switchover", e.Name) + } + + newTargetIP, newTargetPort, err := splitHostPort(newTargetAddress) + if err != nil { + return errors.Wrapf(err, "failed to split target address %s for engine %s target switchover", newTargetAddress, e.Name) + } + e.Lock() defer func() { e.Unlock() if err != nil { - e.log.WithError(err).Warnf("Failed to switch over engine to target address %s", targetAddress) + e.log.WithError(err).Warnf("Failed to switch over engine to target address %s", newTargetAddress) - if disconnected, errCheck := e.IsTargetDisconnected(); errCheck != nil { - e.log.WithError(errCheck).Warnf("Failed to check if target %s is disconnected", targetAddress) + if disconnected, errCheck := e.isTargetDisconnected(); errCheck != nil { + e.log.WithError(errCheck).Warnf("Failed to check if target %s is disconnected", newTargetAddress) } else if disconnected { - if errConnect := e.connectTarget(currentTargetAddress); errConnect != nil { - e.log.WithError(errConnect).Warnf("Failed to connect target back to %s", currentTargetAddress) - } else { - e.log.Infof("Connected target back to %s", currentTargetAddress) - - if errReload := e.reloadDevice(); errReload != nil { - e.log.WithError(errReload).Warnf("Failed to reload device mapper") + if currentTargetAddress != "" { + if errConnect := e.connectTarget(currentTargetAddress); errConnect != nil { + e.log.WithError(errConnect).Warnf("Failed to connect target back to %s", currentTargetAddress) } else { - e.log.Infof("Reloaded device mapper for connecting target back to %s", currentTargetAddress) + e.log.Infof("Connected target back to %s", currentTargetAddress) + + if errReload := e.reloadDevice(); errReload != nil { + e.log.WithError(errReload).Warnf("Failed to reload device mapper") + } else { + e.log.Infof("Reloaded device mapper for connecting target back to %s", currentTargetAddress) + } } } } } else { e.ErrorMsg = "" - e.log.Infof("Switched over target to %s", targetAddress) + e.log.Infof("Switched over target to %s", newTargetAddress) } e.UpdateCh <- nil @@ -2139,6 +2207,7 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress s return errors.Wrapf(err, "failed to create initiator for engine %s target switchover", e.Name) } + // Check if the engine is suspended before target switchover. suspended, err := initiator.IsSuspended() if err != nil { return errors.Wrapf(err, "failed to check if engine %s is suspended", e.Name) @@ -2147,6 +2216,7 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress s return fmt.Errorf("engine %s must be suspended before target switchover", e.Name) } + // Load NVMe device info before target switchover. if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { if !nvme.IsValidNvmeDeviceNotFound(err) { return errors.Wrapf(err, "failed to load NVMe device info for engine %s target switchover", e.Name) @@ -2154,37 +2224,40 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress s } currentTargetAddress = net.JoinHostPort(initiator.TransportAddress, initiator.TransportServiceID) - if e.isSwitchOverTargetRequired(currentTargetAddress, targetAddress) { + if isSwitchOverTargetRequired(currentTargetAddress, newTargetAddress) { if currentTargetAddress != "" { if err := e.disconnectTarget(currentTargetAddress); err != nil { - return err + return errors.Wrapf(err, "failed to disconnect target %s for engine %s", currentTargetAddress, e.Name) } } - if err := e.connectTarget(targetAddress); err != nil { - return err + if err := e.connectTarget(newTargetAddress); err != nil { + return errors.Wrapf(err, "failed to connect target %s for engine %s", newTargetAddress, e.Name) } } // Replace IP and Port with the new target address. - // No need to update TargetIP and TargetPort, because target is not delete yet. - targetIP, targetPort, err := splitHostPort(targetAddress) - if err != nil { - return errors.Wrapf(err, "failed to split target address %s", targetAddress) - } + // No need to update TargetIP, because old target is not delete yet. + e.IP = newTargetIP + e.Port = newTargetPort - e.IP = targetIP - e.Port = targetPort + if newTargetIP == podIP { + e.TargetPort = newTargetPort + e.StandbyTargetPort = 0 + } else { + e.StandbyTargetPort = e.TargetPort + e.TargetPort = 0 + } e.log.Info("Reloading device mapper after target switchover") if err := e.reloadDevice(); err != nil { - return err + return errors.Wrapf(err, "failed to reload device mapper after engine %s target switchover", e.Name) } return nil } -func (e *Engine) IsTargetDisconnected() (bool, error) { +func (e *Engine) isTargetDisconnected() (bool, error) { initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) if err != nil { return false, errors.Wrapf(err, "failed to create initiator for checking engine %s target disconnected", e.Name) @@ -2311,30 +2384,32 @@ func (e *Engine) connectTarget(targetAddress string) error { return nil } -// DeleteTarget deletes the target +// DeleteTarget deletes the target instance func (e *Engine) DeleteTarget(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap) (err error) { - e.log.Infof("Deleting target") + e.log.Infof("Deleting target with target port %d and standby target port %d", e.TargetPort, e.StandbyTargetPort) - if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { - return errors.Wrapf(err, "failed to stop expose bdev after engine %s target switchover", e.Name) + err = spdkClient.StopExposeBdev(e.Nqn) + if err != nil { + return errors.Wrapf(err, "failed to stop expose bdev while deleting target instance for engine %s", e.Name) } - if e.TargetPort != 0 { - if err := superiorPortAllocator.ReleaseRange(e.TargetPort, e.TargetPort); err != nil { - return err - } - e.TargetPort = 0 + err = e.releaseTargetAndStandbyTargetPorts(superiorPortAllocator) + if err != nil { + return errors.Wrapf(err, "failed to release target and standby target ports while deleting target instance for engine %s", e.Name) } - e.log.Infof("Deleting raid bdev %s before target switchover", e.Name) - if _, err := spdkClient.BdevRaidDelete(e.Name); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { - return errors.Wrapf(err, "failed to delete raid bdev after engine %s target switchover", e.Name) + e.log.Infof("Deleting raid bdev %s while deleting target instance", e.Name) + _, err = spdkClient.BdevRaidDelete(e.Name) + if err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return errors.Wrapf(err, "failed to delete raid bdev after engine %s while deleting target instance", e.Name) } for replicaName, replicaStatus := range e.ReplicaStatusMap { - e.log.Infof("Disconnecting replica %s after target switchover", replicaName) - if err := disconnectNVMfBdev(spdkClient, replicaStatus.BdevName); err != nil { - e.log.WithError(err).Warnf("Engine failed to disconnect replica %s after target switchover, will mark the replica mode from %v to ERR", replicaName, replicaStatus.Mode) + e.log.Infof("Disconnecting replica %s while deleting target instance", replicaName) + err = disconnectNVMfBdev(spdkClient, replicaStatus.BdevName) + if err != nil { + e.log.WithError(err).Warnf("Engine failed to disconnect replica %s while deleting target instance, will mark the replica mode from %v to ERR", + replicaName, replicaStatus.Mode) replicaStatus.Mode = types.ModeERR } replicaStatus.BdevName = "" @@ -2342,14 +2417,29 @@ func (e *Engine) DeleteTarget(spdkClient *spdkclient.Client, superiorPortAllocat return nil } -func (e *Engine) isSwitchOverTargetRequired(oldTargetAddress, newTargetAddress string) bool { +func isSwitchOverTargetRequired(oldTargetAddress, newTargetAddress string) bool { return oldTargetAddress != newTargetAddress } -func engineOnlyContainsInitiator(e *Engine) bool { - return e.Port != 0 && e.TargetPort == 0 -} +func (e *Engine) releaseTargetAndStandbyTargetPorts(superiorPortAllocator *commonbitmap.Bitmap) error { + releaseTargetPortRequired := e.TargetPort != 0 + releaseStandbyTargetPortRequired := e.StandbyTargetPort != 0 && e.StandbyTargetPort != e.TargetPort + + // Release the target port + if releaseTargetPortRequired { + if err := superiorPortAllocator.ReleaseRange(e.TargetPort, e.TargetPort); err != nil { + return errors.Wrapf(err, "failed to release target port %d", e.TargetPort) + } + } + e.TargetPort = 0 -func engineOnlyContainsTarget(e *Engine) bool { - return e.Port == 0 && e.TargetPort != 0 + // Release the standby target port + if releaseStandbyTargetPortRequired { + if err := superiorPortAllocator.ReleaseRange(e.StandbyTargetPort, e.StandbyTargetPort); err != nil { + return errors.Wrapf(err, "failed to release standby target port %d", e.StandbyTargetPort) + } + } + e.StandbyTargetPort = 0 + + return nil } diff --git a/pkg/spdk/engine_test.go b/pkg/spdk/engine_test.go new file mode 100644 index 00000000..4780430f --- /dev/null +++ b/pkg/spdk/engine_test.go @@ -0,0 +1,233 @@ +package spdk + +import ( + "fmt" + + "github.com/sirupsen/logrus" + + commonbitmap "github.com/longhorn/go-common-libs/bitmap" + + . "gopkg.in/check.v1" +) + +func (s *TestSuite) TestCheckInitiatorAndTargetCreationRequirements(c *C) { + testCases := []struct { + name string + podIP string + initiatorIP string + targetIP string + port int32 + targetPort int32 + standbyTargetPort int32 + expectedInitiatorCreationRequired bool + expectedTargetCreationRequired bool + expectedError error + }{ + { + name: "Create both initiator and target instances", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.1", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: true, + expectedTargetCreationRequired: true, + expectedError: nil, + }, + { + name: "Create local target instance on the node with initiator instance", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.1", + port: 8080, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: true, + expectedError: nil, + }, + { + name: "Create local initiator instance only", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.2", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: true, + expectedTargetCreationRequired: false, + expectedError: nil, + }, + { + name: "Create local target instance on the node without initiator instance", + podIP: "192.168.1.2", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.2", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: true, + expectedError: nil, + }, + { + name: "Invalid initiator and target addresses", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.2", + targetIP: "192.168.1.3", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: false, + expectedError: fmt.Errorf("invalid initiator and target addresses for engine test-engine creation with initiator address 192.168.1.2 and target address 192.168.1.3"), + }, + { + name: "Standby target instance is already created", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.1", + port: 100, + targetPort: 0, + standbyTargetPort: 105, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: false, + expectedError: nil, + }, + } + for testName, testCase := range testCases { + c.Logf("testing checkInitiatorAndTargetCreationRequirements.%v", testName) + + engine := &Engine{ + Port: testCase.port, + TargetPort: testCase.targetPort, + StandbyTargetPort: testCase.standbyTargetPort, + Name: "test-engine", + log: logrus.New(), + } + + initiatorCreationRequired, targetCreationRequired, err := engine.checkInitiatorAndTargetCreationRequirements(testCase.podIP, testCase.initiatorIP, testCase.targetIP) + + c.Assert(initiatorCreationRequired, Equals, testCase.expectedInitiatorCreationRequired, + Commentf("Test case '%s': unexpected initiator creation requirement", testCase.name)) + c.Assert(targetCreationRequired, Equals, testCase.expectedTargetCreationRequired, + Commentf("Test case '%s': unexpected target creation requirement", testCase.name)) + c.Assert(err, DeepEquals, testCase.expectedError, + Commentf("Test case '%s': unexpected error result", testCase.name)) + } +} + +func (s *TestSuite) TestIsNewEngine(c *C) { + testCases := []struct { + name string + engine *Engine + expected bool + }{ + { + name: "New engine with empty IP and TargetIP and StandbyTargetPort 0", + engine: &Engine{ + IP: "", + TargetIP: "", + StandbyTargetPort: 0, + }, + expected: true, + }, + { + name: "Engine with non-empty IP", + engine: &Engine{ + IP: "192.168.1.1", + TargetIP: "", + StandbyTargetPort: 0, + }, + expected: false, + }, + { + name: "Engine with non-empty TargetIP", + engine: &Engine{ + IP: "", + TargetIP: "192.168.1.2", + StandbyTargetPort: 0, + }, + expected: false, + }, + { + name: "Engine with non-zero StandbyTargetPort", + engine: &Engine{ + IP: "", + TargetIP: "", + StandbyTargetPort: 8080, + }, + expected: false, + }, + } + + for testName, testCase := range testCases { + c.Logf("testing isNewEngine.%v", testName) + result := testCase.engine.isNewEngine() + c.Assert(result, Equals, testCase.expected, Commentf("Test case '%s': unexpected result", testCase.name)) + } +} + +func (s *TestSuite) TestReleaseTargetAndStandbyTargetPorts(c *C) { + testCases := []struct { + name string + engine *Engine + expectedTargetPort int32 + expectedStandbyTargetPort int32 + expectedError error + }{ + { + name: "Release both target and standby target ports", + engine: &Engine{ + TargetPort: 2000, + StandbyTargetPort: 2005, + }, + expectedTargetPort: 0, + expectedStandbyTargetPort: 0, + expectedError: nil, + }, + { + name: "Release target port only but standby target port is not set", + engine: &Engine{ + TargetPort: 2000, + StandbyTargetPort: 0, + }, + expectedTargetPort: 0, + expectedStandbyTargetPort: 0, + expectedError: nil, + }, + { + name: "Release target and standby ports when they are the same", + engine: &Engine{ + TargetPort: 2000, + StandbyTargetPort: 2000, + }, + expectedTargetPort: 0, + expectedStandbyTargetPort: 0, + expectedError: nil, + }, + { + name: "Release snapshot target port only", + engine: &Engine{ + TargetPort: 0, + StandbyTargetPort: 2000, + }, + expectedTargetPort: 0, + expectedStandbyTargetPort: 0, + expectedError: nil, + }, + } + + for testName, testCase := range testCases { + c.Logf("testing releaseTargetAndStandbyTargetPorts.%v", testName) + + bitmap, err := commonbitmap.NewBitmap(0, 100000) + c.Assert(err, IsNil) + + err = testCase.engine.releaseTargetAndStandbyTargetPorts(bitmap) + c.Assert(err, DeepEquals, testCase.expectedError, Commentf("Test case '%s': unexpected error result", testCase.name)) + c.Assert(testCase.engine.TargetPort, Equals, testCase.expectedTargetPort, Commentf("Test case '%s': unexpected target port", testCase.name)) + c.Assert(testCase.engine.StandbyTargetPort, Equals, testCase.expectedStandbyTargetPort, Commentf("Test case '%s': unexpected standby target port", testCase.name)) + } +} diff --git a/pkg/spdk/server.go b/pkg/spdk/server.go index f079257f..b64d1db3 100644 --- a/pkg/spdk/server.go +++ b/pkg/spdk/server.go @@ -878,7 +878,7 @@ func (s *Server) EngineCreate(ctx context.Context, req *spdkrpc.EngineCreateRequ spdkClient := s.spdkClient s.Unlock() - return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired, req.SalvageRequested) + return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.SalvageRequested) } func localTargetExists(e *Engine) bool { diff --git a/pkg/spdk_test.go b/pkg/spdk_test.go index 1123fc6f..cc6e243c 100644 --- a/pkg/spdk_test.go +++ b/pkg/spdk_test.go @@ -268,7 +268,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) { replica2.Name: types.ModeRW, } endpoint := helperutil.GetLonghornDevicePath(volumeName) - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -351,7 +351,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) { replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), } - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -527,7 +527,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) { replica2.Name: types.ModeRW, } endpoint := helperutil.GetLonghornDevicePath(volumeName) - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1225,7 +1225,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName, // Restart the engine without the frontend err = spdkCli.EngineDelete(engineName) c.Assert(err, IsNil) - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1240,7 +1240,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName, // Restart the engine with the previous frontend err = spdkCli.EngineDelete(engineName) c.Assert(err, IsNil) - engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1351,7 +1351,7 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) { replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), } - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false) c.Assert(err, IsNil) c.Assert(engine.Endpoint, Equals, "") @@ -1375,77 +1375,3 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) { err = spdkCli.ReplicaDelete(replicaName2, false) c.Assert(err, IsNil) } - -func (s *TestSuite) TestSPDKEngineCreateWithUpgradeRequired(c *C) { - fmt.Println("Testing SPDK Engine Creation with Upgrade Required") - - diskDriverName := "aio" - - ip, err := commonnet.GetAnyExternalIP() - c.Assert(err, IsNil) - os.Setenv(commonnet.EnvPodIP, ip) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ne, err := helperutil.NewExecutor(commontypes.ProcDirectory) - c.Assert(err, IsNil) - LaunchTestSPDKGRPCServer(ctx, c, ip, ne.Execute) - - loopDevicePath := PrepareDiskFile(c) - defer func() { - CleanupDiskFile(c, loopDevicePath) - }() - - spdkCli, err := client.NewSPDKClient(net.JoinHostPort(ip, strconv.Itoa(types.SPDKServicePort))) - c.Assert(err, IsNil) - - disk, err := spdkCli.DiskCreate(defaultTestDiskName, "", loopDevicePath, diskDriverName, int64(defaultTestBlockSize)) - c.Assert(err, IsNil) - c.Assert(disk.Path, Equals, loopDevicePath) - c.Assert(disk.Uuid, Not(Equals), "") - - defer func() { - err := spdkCli.DiskDelete(defaultTestDiskName, disk.Uuid, disk.Path, diskDriverName) - c.Assert(err, IsNil) - }() - - volumeName := "test-vol" - engineName := fmt.Sprintf("%s-engine", volumeName) - replicaName1 := fmt.Sprintf("%s-replica-1", volumeName) - replicaName2 := fmt.Sprintf("%s-replica-2", volumeName) - - replica1, err := spdkCli.ReplicaCreate(replicaName1, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - replica2, err := spdkCli.ReplicaCreate(replicaName2, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - - replicaAddressMap := map[string]string{ - replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), - replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), - } - - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false) - c.Assert(err, IsNil) - - targetAddress := fmt.Sprintf("%s:%d", engine.TargetIP, engine.TargetPort) - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, true, false) - c.Assert(err, IsNil) - c.Assert(engine.Endpoint, Not(Equals), "") - // Initiator is not created, so the IP and Port should be empty - c.Assert(engine.IP, Equals, ip) - c.Assert(engine.Port, Not(Equals), int32(0)) - // Target is created and exposed - c.Assert(engine.TargetIP, Equals, ip) - c.Assert(engine.TargetPort, Not(Equals), int32(0)) - c.Assert(engine.Port, Equals, engine.TargetPort) - - // Tear down engine and replicas - err = spdkCli.EngineDelete(engineName) - c.Assert(err, IsNil) - - err = spdkCli.ReplicaDelete(replicaName1, false) - c.Assert(err, IsNil) - err = spdkCli.ReplicaDelete(replicaName2, false) - c.Assert(err, IsNil) -} From dd6a838c14525253d470dd983b2db6da0345a9ff Mon Sep 17 00:00:00 2001 From: Derek Su Date: Tue, 26 Nov 2024 11:38:36 +0800 Subject: [PATCH 3/3] fix(engine): no need to mark engine as error after failing to suspend or resume engine No need to mark engine as error after failing to suspend or resume engine. Longhorn manager will retry the operation later. Longhorn 9104 Signed-off-by: Derek Su --- pkg/spdk/engine.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/spdk/engine.go b/pkg/spdk/engine.go index 42c79726..6b2c3353 100644 --- a/pkg/spdk/engine.go +++ b/pkg/spdk/engine.go @@ -2089,8 +2089,7 @@ func (e *Engine) Suspend(spdkClient *spdkclient.Client) (err error) { if err != nil { if e.State != types.InstanceStateError { - e.State = types.InstanceStateError - e.log.WithError(err).Info("Failed to suspend engine, will mark the engine as error") + e.log.WithError(err).Warn("Failed to suspend engine") } e.ErrorMsg = err.Error() } else { @@ -2121,8 +2120,7 @@ func (e *Engine) Resume(spdkClient *spdkclient.Client) (err error) { if err != nil { if e.State != types.InstanceStateError { - e.State = types.InstanceStateError - e.log.WithError(err).Info("Failed to resume engine, will mark the engine as error") + e.log.WithError(err).Warn("Failed to resume engine") } e.ErrorMsg = err.Error() } else {