Skip to content

Commit

Permalink
Merge pull request #2240 from bakaphp/feat-amplitude-exporter
Browse files Browse the repository at this point in the history
feat: add amplitude download
  • Loading branch information
kaioken authored Oct 21, 2024
2 parents 910350c + fcf35bd commit de20efe
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 83 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/static-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ jobs:
TEST_APPLE_LOGIN_TOKEN: ${{ secrets.TEST_APPLE_LOGIN_TOKEN }}
TEST_APOLLO_KEY: ${{ secrets.TEST_APOLLO_KEY }}
TEST_STRIPE_SECRET_KEY: ${{ secrets.TEST_STRIPE_SECRET_KEY }}

TEST_AMPLITUDE_KEY: ${{ secrets.TEST_AMPLITUDE_KEY }}
TEST_AMPLITUDE_SECRET: ${{ secrets.TEST_AMPLITUDE_SECRET }}
strategy:
fail-fast: false
matrix:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ jobs:
TEST_APPLE_LOGIN_TOKEN: ${{ secrets.TEST_APPLE_LOGIN_TOKEN }}
TEST_APOLLO_KEY: ${{ secrets.TEST_APOLLO_KEY }}
TEST_STRIPE_SECRET_KEY: ${{ secrets.TEST_STRIPE_SECRET_KEY }}
TEST_AMPLITUDE_KEY: ${{ secrets.TEST_AMPLITUDE_KEY }}
TEST_AMPLITUDE_SECRET: ${{ secrets.TEST_AMPLITUDE_SECRET }}
strategy:
fail-fast: false
matrix:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
<?php

declare(strict_types=1);

namespace App\Console\Commands\Connectors\Amplitude;

use Baka\Traits\KanvasJobsTrait;
use DateInterval;
use DatePeriod;
use DateTime;
use Google\Cloud\DiscoveryEngine\V1\Client\UserEventServiceClient;
use Google\Cloud\DiscoveryEngine\V1\DocumentInfo;
use Google\Cloud\DiscoveryEngine\V1\UserEvent;
use Google\Cloud\DiscoveryEngine\V1\WriteUserEventRequest;
use Google\Protobuf\Timestamp;
use Illuminate\Console\Command;
use Kanvas\Apps\Models\Apps;
use Kanvas\Companies\Models\Companies;
use Kanvas\Connectors\Amplitude\Client;
use Kanvas\Connectors\Google\Enums\ConfigurationEnum;
use Kanvas\Exceptions\ValidationException;
use Kanvas\Users\Models\UsersAssociatedApps;

class SyncEventsWithGoogleCommand extends Command
{
use KanvasJobsTrait;

/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'kanvas:amplitude-sync-events-to-google {app_id} {company_id} {start_date} {end_date}';

/**
* The console command description.
*
* @var string|null
*/
protected $description = 'Sync all events from Amplitude to Google Dynamic Recommendation';

/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$app = Apps::getById((int) $this->argument('app_id'));
$company = Companies::getById((int) $this->argument('company_id'));
$amplitudeClient = new Client($app, $company);

$startDate = new DateTime($this->argument('start_date'));
$endDate = new DateTime($this->argument('end_date'));

// Create a DatePeriod object to loop through each day
$interval = new DateInterval('P1D'); // 1 day interval
$dateRange = new DatePeriod($startDate, $interval, $endDate->add($interval));

$googleClientConfig = $app->get(ConfigurationEnum::GOOGLE_CLIENT_CONFIG->value);
$googleRecommendationConfig = $app->get(ConfigurationEnum::GOOGLE_RECOMMENDATION_CONFIG->value);

if (! $googleClientConfig) {
throw new ValidationException('Google client config not found for app ' . $app->name);
}

if (! $googleRecommendationConfig) {
throw new ValidationException('Google recommendation config not found for app ' . $app->name);
}

foreach ($dateRange as $date) {
$currentDate = $date->format('Y-m-d');
$this->info("Syncing events for {$currentDate}");

// Fetch events from Amplitude for the current day
$results = $amplitudeClient->eventsExport($currentDate, $currentDate);

foreach ($results as $result) {
$eventDateTime = new DateTime($result['client_event_time']);
$eventType = $result['event_type'] ?? null;

$eventTypeWeb = $result['event_properties']['[Amplitude] Page Path'] ?? null;
$displayname = $result['event_properties']['username'] ?? null;
$messageId = $result['event_properties']['prompt_id'] ?? 0;

$eventType = match (trim($eventType)) {
'View Explore' => 'view-home-page',
'Page Viewed' => 'view-item',
'View Library' => 'view-home-page',
'Select Prompt' => 'view-item',
'[Amplitude] Page Viewed' => 'view-item',
default => null,
};

if (! $eventType) {
continue;
}

$user = UsersAssociatedApps::fromApp($app)->where('displayname', $displayname)->first();

if (! $user) {
// Use default user if no user found
$user = UsersAssociatedApps::fromApp($app)->where('users_id', $app->get('default_user_recommendation_catchall_id'))->first();
}

if (! $user) {
continue;
}

$document = new DocumentInfo();
$document->setId($messageId);

$eventTime = new Timestamp();
$eventTime->fromDateTime($eventDateTime);

// Prepare the request message
$userEvent = (new UserEvent())
->setEventType($eventType)
->setUserPseudoId((string) $user->users_id)
->setDocuments([$document])
->setEventTime($eventTime);

$writeUserEventRequest = (new WriteUserEventRequest())
->setParent(UserEventServiceClient::dataStoreName(
$googleRecommendationConfig['projectId'],
$googleRecommendationConfig['location'],
$googleRecommendationConfig['dataSource']
))
->setUserEvent($userEvent);

$googleClient = new UserEventServiceClient([
'credentials' => $googleClientConfig,
]);

// Send event to Google
$googleClient->writeUserEvent($writeUserEventRequest);
$this->info('Event: ' . $eventType . ' for user ' . $user->displayname . ' at ' . $eventTime->toDateTime()->format('Y-m-d H:i:s'));
}
}

$this->info('All events synced.');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace App\Console\Commands\Connectors\Apollo;

use Baka\Traits\KanvasJobsTrait;
use Carbon\Carbon;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Cache;
use Kanvas\Apps\Models\Apps;
Expand All @@ -22,7 +21,7 @@ class SyncAllPeopleInCompanyCommand extends Command
*
* @var string
*/
protected $signature = 'kanvas:guild-apollo-people-sync {app_id} {company_id} {total=200} {perPage=200}';
protected $signature = 'kanvas:guild-apollo-people-sync {app_id} {company_id} {total=150} {perPage=50}';

/**
* The console command description.
Expand All @@ -44,100 +43,87 @@ public function handle()
$this->overwriteAppService($app);
$company = Companies::getById((int) $this->argument('company_id'));

$hourlyRateLimit = 400; // Maximum API calls per hour
$dailyRateLimit = 2000; // Maximum API calls per day
$batchSize = 100; // Number of people to process per batch
$hourlyRateLimit = 400;
$dailyRateLimit = 2000;
$batchSize = 100;
$hourlyCacheKey = 'api_hourly_rate_limit_' . $app->getId();
$dailyCacheKey = 'api_daily_rate_limit_' . $app->getId();
$resetHourlyKey = 'api_hourly_rate_limit_reset_' . $app->getId();
$resetDailyKey = 'api_daily_rate_limit_reset_' . $app->getId();
$hourlyTimeWindow = 60 * 60; // 1 hour in seconds
$dailyTimeWindow = 24 * 60 * 60; // 24 hours in seconds
$hourlyTimeWindow = 60 * 60;
$dailyTimeWindow = 24 * 60 * 60;

// Check the current count of API calls
$currentHourlyCount = Cache::get($hourlyCacheKey, 0);
$currentDailyCount = Cache::get($dailyCacheKey, 0);
$resetHourlyTimestamp = Cache::get($resetHourlyKey);
$resetDailyTimestamp = Cache::get($resetDailyKey);

$this->line('Syncing ' . $currentHourlyCount . ' people in company ' . $company->name . ' from app ' . $app->name . ' total ' . $total . ' per page ' . $perPage);

if ($resetHourlyTimestamp) {
$resetHourlyTime = Carbon::parse($resetHourlyTimestamp);
$currentTimestamp = now()->timestamp;
$hourlyWaitTime = $resetHourlyTime->timestamp - $currentTimestamp;

if ($currentHourlyCount >= $hourlyRateLimit && $hourlyWaitTime > 0) {
$this->line("Hourly rate limit reached. Please wait $hourlyWaitTime seconds to run the process again.");

return;
}
// Reset hourly/daily counters if time window has expired
if (now()->timestamp >= Cache::get($resetHourlyKey, 0)) {
Cache::put($hourlyCacheKey, 0, $hourlyTimeWindow);
}

if ($resetDailyTimestamp) {
$resetDailyTime = Carbon::parse($resetDailyTimestamp);
$currentTimestamp = now()->timestamp;
$dailyWaitTime = $resetDailyTime->timestamp - $currentTimestamp;

if ($currentDailyCount >= $dailyRateLimit && $dailyWaitTime > 0) {
$this->line("Daily rate limit reached. Please wait $dailyWaitTime seconds to run the process again.");

return;
}
if (now()->timestamp >= Cache::get($resetDailyKey, 0)) {
Cache::put($dailyCacheKey, 0, $dailyTimeWindow);
}
People::fromApp($app)
->fromCompany($company)
->notDeleted(0)
->orderBy('peoples.id', 'DESC')
->limit($total)
->chunk($perPage, function ($peoples) use (&$currentHourlyCount, &$currentDailyCount, $hourlyRateLimit, $dailyRateLimit, $hourlyCacheKey, $dailyCacheKey, $resetHourlyKey, $resetDailyKey, $hourlyTimeWindow, $dailyTimeWindow) {
foreach ($peoples as $people) {
// Check if the person has the 'APOLLO_DATA_ENRICHMENT_CUSTOM_FIELDS' custom field
$hasCustomField = $people->get(ConfigurationEnum::APOLLO_DATA_ENRICHMENT_CUSTOM_FIELDS->value);

if ($hasCustomField) {
// Skip this record if the custom field exists
continue;
}

// Process the record if the custom field does not exist
if ($currentHourlyCount >= $hourlyRateLimit) {
Cache::put($resetHourlyKey, now()->addSeconds($hourlyTimeWindow), $hourlyTimeWindow);
$this->line("Hourly rate limit reached. Please wait $hourlyTimeWindow seconds to run the process again.");

return false;
}
$currentHourlyCount = Cache::get($hourlyCacheKey, 0);
$currentDailyCount = Cache::get($dailyCacheKey, 0);

if ($currentDailyCount >= $dailyRateLimit) {
Cache::put($resetDailyKey, now()->addSeconds($dailyTimeWindow), $dailyTimeWindow);
$this->line("Daily rate limit reached. Please wait $dailyTimeWindow seconds to run the process again.");
$this->line("Syncing people for company {$company->name} from app {$app->name}, total {$total}, per page {$perPage}");

return false;
People::fromApp($app)
->fromCompany($company)
->notDeleted(0)
->orderBy('peoples.id', 'DESC')
->limit($total)
->chunk($perPage, function ($peoples) use (&$currentHourlyCount, &$currentDailyCount, $hourlyRateLimit, $dailyRateLimit, $hourlyCacheKey, $dailyCacheKey, $resetHourlyKey, $resetDailyKey, $hourlyTimeWindow, $dailyTimeWindow) {
foreach ($peoples as $people) {
$hasCustomField = $people->get(ConfigurationEnum::APOLLO_DATA_ENRICHMENT_CUSTOM_FIELDS->value);
if ($hasCustomField) {
continue;
}

if ($currentHourlyCount >= $hourlyRateLimit) {
Cache::put($resetHourlyKey, now()->addSeconds($hourlyTimeWindow), $hourlyTimeWindow);
$this->line('Hourly rate limit reached. Waiting for reset...');
sleep($hourlyTimeWindow);

continue;
}

if ($currentDailyCount >= $dailyRateLimit) {
Cache::put($resetDailyKey, now()->addSeconds($dailyTimeWindow), $dailyTimeWindow);
$this->line('Daily rate limit reached. Waiting for reset...');
sleep($dailyTimeWindow);

continue;
}

$this->line("Syncing people {$people->id}: {$people->firstname} {$people->lastname}");

$people->fireWorkflow(
WorkflowEnum::UPDATED->value,
true,
['app' => $people->app]
);

$currentHourlyCount++;
$currentDailyCount++;

Cache::put($hourlyCacheKey, $currentHourlyCount, $hourlyTimeWindow);
Cache::put($dailyCacheKey, $currentDailyCount, $dailyTimeWindow);

// Dynamic delay based on remaining rate limit
$delay = $this->calculateDelay($currentHourlyCount, $hourlyRateLimit);
sleep($delay);
}
});

$this->line('Syncing people ' . $people->id . ' ' . $people->firstname . ' ' . $people->lastname);

$people->fireWorkflow(
WorkflowEnum::UPDATED->value,
true,
[
'app' => $people->app,
]
);
//$people->clearLightHouseCacheJob();

// Increment the count and update cache for rate limiting
$currentHourlyCount++;
$currentDailyCount++;
Cache::put($hourlyCacheKey, $currentHourlyCount, $hourlyTimeWindow);
Cache::put($dailyCacheKey, $currentDailyCount, $dailyTimeWindow);

usleep(100000); // 100ms delay between each request
}
});
$this->line("All people for company {$company->name} from app {$app->name} synced");
}

$this->line('All people in company ' . $company->name . ' from app ' . $app->name . ' synced');
private function calculateDelay(int $currentCount, int $rateLimit): int
{
// Adjust delay dynamically to distribute requests evenly
$remainingRequests = $rateLimit - $currentCount;
$remainingTime = 60 * 60; // 1 hour in seconds

return;
return $remainingRequests > 0 ? intdiv($remainingTime, $remainingRequests) : 2;
}
}
11 changes: 11 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@
"@php artisan key:generate --ansi"
],
"test": "php artisan test",
"migrate-kanvas-all": [
"php artisan migrate",
"php artisan migrate --path database/migrations/Inventory/ --database inventory",
"php artisan migrate --path database/migrations/Social/ --database social",
"php artisan migrate --path database/migrations/Guild/ --database crm",
"php artisan migrate --path database/migrations/Souk/ --database commerce",
"php artisan migrate --path database/migrations/ActionEngine/ --database action_engine",
"php artisan migrate --path database/migrations/Subscription/ --database mysql",
"php artisan migrate --path database/migrations/Event/ --database event",
"php artisan migrate --path database/migrations/Workflow/ --database workflow"
],
"migrate-kanvas": [
"php artisan migrate"
],
Expand Down
Loading

0 comments on commit de20efe

Please sign in to comment.