Skip to content

Commit

Permalink
Merge pull request #58 from SwiftPackageIndex/concurrent-redirect-checks
Browse files Browse the repository at this point in the history
Concurrent redirect checks
  • Loading branch information
finestructure committed Jan 12, 2024
2 parents cfe4b67 + 74ceb84 commit 01f5dcd
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 32 deletions.
88 changes: 56 additions & 32 deletions Sources/ValidatorCore/Commands/CheckRedirects.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import AsyncHTTPClient

extension Validator {
struct CheckRedirects: AsyncParsableCommand {
@Option(name: .shortAndLong, help: "number of checks to run in parallel")
var concurrency: Int?

@Option(name: .shortAndLong, help: "read input from file")
var input: String?

Expand Down Expand Up @@ -66,11 +69,12 @@ extension Validator {
}
}

static var normalizedPackageURLs = NormalizedPackageURLs(inputURLs: [])

static func process(redirect: Redirect,
verbose: Bool,
index: Int,
packageURL: PackageURL,
normalized: inout Set<String>) throws -> PackageURL? {
packageURL: PackageURL) async throws -> PackageURL? {
if verbose || index % 50 == 0 {
print("package \(index) ...")
fflush(stdout)
Expand All @@ -92,13 +96,13 @@ extension Validator {
case .rateLimited:
fatalError("rate limited - should have been retried at a lower level")
case .redirected(let url):
guard !normalized.contains(url.normalized()) else {
if await normalizedPackageURLs.insert(url).inserted {
print("ADD \(packageURL) -> \(url) (new)")
return url
} else {
print("DELETE \(packageURL) -> \(url) (exists)")
return nil
}
print("ADD \(packageURL) -> \(url) (new)")
normalized.insert(url.normalized())
return url
case .unauthorized:
print("package \(index) ...")
print("UNAUTHORIZED: \(packageURL.absoluteString) (deleting package)")
Expand All @@ -107,6 +111,16 @@ extension Validator {
}

func run() async throws {
let start = Date()
defer {
let elapsed = Date().timeIntervalSince(start)
if elapsed < 120 {
print("Elapsed (/s):", elapsed)
} else {
print("Elapsed (/min):", elapsed/60)
}
}

let verbose = verbose
let inputURLs = try inputSource.packageURLs()
let prefix = limit ?? inputURLs.count
Expand All @@ -121,39 +135,49 @@ extension Validator {
print("Chunk \(chunk) of \(numberOfChunks)")
}

var normalized = Set(inputURLs.map { $0.normalized() })
var updated = [PackageURL]()

for (index, packageURL) in inputURLs[offset...]
.prefix(prefix)
.chunk(index: chunk, of: numberOfChunks)
.enumerated() {
let index = index + offset
let redirect = try await resolvePackageRedirects(client: httpClient, for: packageURL)

if index % 100 == 0, let token = Current.githubToken() {
let rateLimit = try await Github.getRateLimit(client: httpClient, token: token).get()
if rateLimit.remaining < 200 {
print("Rate limit remaining: \(rateLimit.remaining)")
print("Sleeping until reset at \(rateLimit.resetDate) ...")
sleep(UInt32(rateLimit.secondsUntilReset + 0.5))
Self.normalizedPackageURLs = .init(inputURLs: inputURLs)

let semaphore = Semaphore(maximum: concurrency ?? 1)

let updated = try await withThrowingTaskGroup(of: PackageURL?.self) { group in
for (index, packageURL) in inputURLs[offset...]
.prefix(prefix)
.chunk(index: chunk, of: numberOfChunks)
.enumerated() {
await semaphore.increment()
try? await semaphore.waitForAvailability()
group.addTask {
let index = index + offset
let redirect = try await resolvePackageRedirects(client: httpClient, for: packageURL)

if index % 100 == 0, let token = Current.githubToken() {
let rateLimit = try await Github.getRateLimit(client: httpClient, token: token).get()
if rateLimit.remaining < 200 {
print("Rate limit remaining: \(rateLimit.remaining)")
print("Sleeping until reset at \(rateLimit.resetDate) ...")
sleep(UInt32(rateLimit.secondsUntilReset + 0.5))
}
}

let res = try await Self.process(redirect: redirect,
verbose: verbose,
index: index,
packageURL: packageURL)

await semaphore.decrement()
return res
}
}

if let res = try Self.process(redirect: redirect,
verbose: verbose,
index: index,
packageURL: packageURL,
normalized: &normalized) {
updated.append(res)
}
return try await group
.compactMap { $0 }
.reduce(into: [], { res, next in res.append(next) })
.sorted(by: { $0.lowercased() < $1.lowercased() })
}

updated.sort(by: { $0.lowercased() < $1.lowercased() })

if let path = output {
try Current.fileManager.saveList(updated, path: path)
}
}
}
}

16 changes: 16 additions & 0 deletions Sources/ValidatorCore/NormalizedPackageURLs.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
actor NormalizedPackageURLs {
var normalized: Set<String>

init(inputURLs: [PackageURL]) {
self.normalized = Set(inputURLs.map { $0.normalized() })
}

func contains(_ url: PackageURL) -> Bool {
normalized.contains(url.normalized())
}

@discardableResult
func insert(_ url: PackageURL) -> (inserted: Bool, memberAfterInsert: String) {
normalized.insert(url.normalized())
}
}
39 changes: 39 additions & 0 deletions Sources/ValidatorCore/Semaphore.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright Dave Verwer, Sven A. Schmidt, and other contributors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import Foundation
#if os(Linux)
import CDispatch // for NSEC_PER_SEC https://github.com/apple/swift-corelibs-libdispatch/issues/659
#endif


actor Semaphore {
var current = 0
var maximum: Int
var granularity: Double

init(maximum: Int, granularity: Double = 0.01) {
self.maximum = maximum
self.granularity = granularity
}

var unavailable: Bool { current > maximum }

func increment() { current += 1 }
func decrement() { current -= 1 }

func waitForAvailability() async throws {
while unavailable { try await Task.sleep(nanoseconds: UInt64(granularity * Double(NSEC_PER_SEC))) }
}
}
52 changes: 52 additions & 0 deletions Tests/ValidatorTests/CheckRedirectTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright Dave Verwer, Sven A. Schmidt, and other contributors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import XCTest

@testable import ValidatorCore


final class CheckRedirectTests: XCTestCase {

func test_process_new_redirect() async throws {
// setup - redirected to URL is new
Validator.CheckRedirects.normalizedPackageURLs = .init(inputURLs: [.p1])

// MUT
let res = try await Validator.CheckRedirects.process(redirect: .redirected(to: .p2),
verbose: true,
index: 0,
packageURL: .p1)
XCTAssertEqual(res, .p2)
}

func test_process_existing_redirect() async throws {
// setup - redirected to URL is already known
Validator.CheckRedirects.normalizedPackageURLs = .init(inputURLs: [.p1, .p2])

// MUT
let res = try await Validator.CheckRedirects.process(redirect: .redirected(to: .p2),
verbose: true,
index: 0,
packageURL: .p1)
XCTAssertEqual(res, nil)
}

}


private extension PackageURL {
static let p1 = PackageURL(argument: "https://github.com/org/1.git")!
static let p2 = PackageURL(argument: "https://github.com/org/2.git")!
}

0 comments on commit 01f5dcd

Please sign in to comment.