From 150395bc14c4a3506e2e4bb78be9bb1e3636f2dc Mon Sep 17 00:00:00 2001 From: kaioken Date: Sun, 20 Oct 2024 19:13:57 -0400 Subject: [PATCH 1/3] feat: add amplitude download --- .github/workflows/static-analysis.yml | 3 +- .github/workflows/tests.yml | 2 + composer.json | 11 ++ src/Domains/Connectors/Amplitude/Client.php | 108 ++++++++++++++++++ .../Amplitude/DownloadEventTest.php | 32 ++++++ 5 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 src/Domains/Connectors/Amplitude/Client.php create mode 100644 tests/Connectors/Integration/Amplitude/DownloadEventTest.php diff --git a/.github/workflows/static-analysis.yml b/.github/workflows/static-analysis.yml index f52546b64..ba171d8d7 100644 --- a/.github/workflows/static-analysis.yml +++ b/.github/workflows/static-analysis.yml @@ -62,7 +62,8 @@ jobs: TEST_APPLE_LOGIN_TOKEN: ${{ secrets.TEST_APPLE_LOGIN_TOKEN }} TEST_APOLLO_KEY: ${{ secrets.TEST_APOLLO_KEY }} TEST_STRIPE_SECRET_KEY: ${{ secrets.TEST_STRIPE_SECRET_KEY }} - + TEST_AMPLITUDE_KEY: ${{ secrets.TEST_AMPLITUDE_KEY }} + TEST_AMPLITUDE_SECRET: ${{ secrets.TEST_AMPLITUDE_SECRET }} strategy: fail-fast: false matrix: diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 315ef7a05..b66ef0363 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -71,6 +71,8 @@ jobs: TEST_APPLE_LOGIN_TOKEN: ${{ secrets.TEST_APPLE_LOGIN_TOKEN }} TEST_APOLLO_KEY: ${{ secrets.TEST_APOLLO_KEY }} TEST_STRIPE_SECRET_KEY: ${{ secrets.TEST_STRIPE_SECRET_KEY }} + TEST_AMPLITUDE_KEY: ${{ secrets.TEST_AMPLITUDE_KEY }} + TEST_AMPLITUDE_SECRET: ${{ secrets.TEST_AMPLITUDE_SECRET }} strategy: fail-fast: false matrix: diff --git a/composer.json b/composer.json index 32b1edcfe..67136360d 100644 --- a/composer.json +++ b/composer.json @@ -108,6 +108,17 @@ "@php artisan key:generate --ansi" ], "test": "php artisan test", + "migrate-kanvas-all": [ + "php artisan migrate", + "php artisan migrate --path database/migrations/Inventory/ --database inventory", + "php artisan migrate --path database/migrations/Social/ --database social", + "php artisan migrate --path database/migrations/Guild/ --database crm", + "php artisan migrate --path database/migrations/Souk/ --database commerce", + "php artisan migrate --path database/migrations/ActionEngine/ --database action_engine", + "php artisan migrate --path database/migrations/Subscription/ --database mysql", + "php artisan migrate --path database/migrations/Event/ --database event", + "php artisan migrate --path database/migrations/Workflow/ --database workflow" + ], "migrate-kanvas": [ "php artisan migrate" ], diff --git a/src/Domains/Connectors/Amplitude/Client.php b/src/Domains/Connectors/Amplitude/Client.php new file mode 100644 index 000000000..eeea7fcf3 --- /dev/null +++ b/src/Domains/Connectors/Amplitude/Client.php @@ -0,0 +1,108 @@ +get('amplitude_api_key') || ! $app->get('amplitude_api_secret')) { + throw new ValidationException('Amplitude API key or secret is not set in app settings.'); + } + + $this->apiKey = $app->get('amplitude_api_key'); + $this->apiSecret = $app->get('amplitude_api_secret'); + } + + /** + * Stream the export data from Amplitude in chunks with a 1-week limit. + */ + public function eventsExport(string $startDate, string $endDate): array + { + $start = new DateTime($startDate); + $end = new DateTime($endDate); + + $interval = $start->diff($end); + if ($interval->days > 7) { + throw new ValidationException('The date range cannot exceed 7 days.'); + } + + // Convert to the required Amplitude format Ymd\TH + $startFormatted = $start->format('Ymd\T00'); + $endFormatted = $end->format('Ymd\T23'); // Assuming you want to include the entire last day + + $path = '/export'; + $params = [ + 'start' => $startFormatted, + 'end' => $endFormatted, + ]; + + $response = Http::withBasicAuth($this->apiKey, $this->apiSecret) + ->get($this->baseUrl . $path, $params); + + if ($response->failed()) { + throw new ValidationException('Failed to fetch data from Amplitude API.'); + } + + // Save the streamed ZIP response to a temporary file + $tempZipFile = tempnam(sys_get_temp_dir(), 'amplitude_export_'); + file_put_contents($tempZipFile, $response->body()); // Save the entire body to the temp file + + // Extract the ZIP archive + $zip = new ZipArchive(); + if ($zip->open($tempZipFile) === true) { + $extractedData = []; + + // Loop through all the files in the ZIP archive + for ($i = 0; $i < $zip->numFiles; $i++) { + $filename = $zip->getNameIndex($i); + $fileContents = $zip->getFromIndex($i); + + // If the file is a .json.gz, we need to decompress it + if (strpos($filename, '.json.gz') !== false) { + $decompressed = gzdecode($fileContents); // Decompress the GZIP file + + if ($decompressed === false) { + throw new ValidationException("Failed to decompress GZIP file: $filename"); + } + + // Split decompressed data by newlines (assuming NDJSON format) + $lines = explode("\n", $decompressed); + + // Process each line as a JSON object + foreach ($lines as $line) { + if (! empty($line)) { + $data = json_decode($line, true); + if ($data !== null) { + $extractedData[] = $data; // Collect the JSON data + } + } + } + } + } + $zip->close(); + + // Clean up the temporary ZIP file + unlink($tempZipFile); + + return $extractedData; + } else { + throw new ValidationException('Failed to open ZIP file.'); + } + } +} diff --git a/tests/Connectors/Integration/Amplitude/DownloadEventTest.php b/tests/Connectors/Integration/Amplitude/DownloadEventTest.php new file mode 100644 index 000000000..ecb103238 --- /dev/null +++ b/tests/Connectors/Integration/Amplitude/DownloadEventTest.php @@ -0,0 +1,32 @@ +user(); + $company = $user->getCurrentCompany(); + + $app->set('amplitude_api_key', getenv('TEST_AMPLITUDE_KEY')); + $app->set('amplitude_api_secret', getenv('TEST_AMPLITUDE_SECRET')); + $amplitudeClient = new Client($app, $company); + + $startDate = date('Ymd\TH', strtotime('2024-10-17')) . '22'; // Start at 22:00 of the previous day + $endDate = date('Ymd\TH', strtotime('2024-10-17')) . '23'; // End at 23:59 of the previous day + + $results = $amplitudeClient->eventsExport($startDate, $endDate); + + $this->assertIsArray($results); + $this->assertNotEmpty($results); + $this->assertArrayHasKey('event_properties', $results[0]); + } +} From 838f5df33cb5eedcadff69cc3833908868d5da95 Mon Sep 17 00:00:00 2001 From: kaioken Date: Sun, 20 Oct 2024 23:25:25 -0400 Subject: [PATCH 2/3] feat: add amplitude sync ocmmando --- .../Amplitude/SyncEventsWithGoogleCommand.php | 132 +++++++++++++++ .../Apollo/SyncAllPeopleInCompanyCommand.php | 150 ++++++++---------- 2 files changed, 200 insertions(+), 82 deletions(-) create mode 100644 app/Console/Commands/Connectors/Amplitude/SyncEventsWithGoogleCommand.php diff --git a/app/Console/Commands/Connectors/Amplitude/SyncEventsWithGoogleCommand.php b/app/Console/Commands/Connectors/Amplitude/SyncEventsWithGoogleCommand.php new file mode 100644 index 000000000..486cc62f6 --- /dev/null +++ b/app/Console/Commands/Connectors/Amplitude/SyncEventsWithGoogleCommand.php @@ -0,0 +1,132 @@ +argument('app_id')); + $company = Companies::getById((int) $this->argument('company_id')); + + $amplitudeClient = new Client($app, $company); + + $this->info('Syncing events from Amplitude to Google Dynamic Recommendation for app ' . $app->name . ' during ' . $this->argument('start_date') . ' to ' . $this->argument('end_date')); + + $results = $amplitudeClient->eventsExport($this->argument('start_date'), $this->argument('end_date')); + $googleClientConfig = $app->get(ConfigurationEnum::GOOGLE_CLIENT_CONFIG->value); + $googleRecommendationConfig = $app->get(ConfigurationEnum::GOOGLE_RECOMMENDATION_CONFIG->value); + + if (! $googleClientConfig) { + throw new ValidationException('Google client config not found for app ' . $$app->name); + } + + if (! $googleRecommendationConfig) { + throw new ValidationException('Google recommendation config not found for app ' . $$app->name); + } + + $googleClientConfig = $googleClientConfig; + $googleRecommendationConfig = $googleRecommendationConfig; + $client = new RecommendationServiceClient([ + 'credentials' => $googleClientConfig, + ]); + $userEventServiceClient = UserEventServiceClient::dataStoreName( + $googleRecommendationConfig['projectId'], + $googleRecommendationConfig['location'], + $googleRecommendationConfig['dataSource'], + ); + + foreach ($results as $result) { + $eventDateTime = new DateTime($result['client_event_time']); + $eventType = $result['event_type'] ?? null; + + $eventTypeWeb = $result['event_properties']['[Amplitude] Page Path'] ?? null; + $displayname = $result['event_properties']['username'] ?? null; + $messageId = $result['event_properties']['prompt_id'] ?? 0; + + $eventType = match (trim($eventType)) { + 'View Explore' => 'view-home-page', + 'Page Viewed' => 'view-item', + 'View Library' => 'view-home-page', + 'Select Prompt' => 'view-item', + '[Amplitude] Page Viewed' => 'view-item', + default => null, + }; + + if (! $eventType) { + continue; + } + + $user = UsersAssociatedApps::fromApp($app)->where('displayname', $displayname)->first(); + + if (! $user) { + continue; + } else { + $user = UsersAssociatedApps::fromApp($app)->where('users_id', $app->get('default_user_recommendation_catchall_id'))->first(); + } + + $document = new DocumentInfo(); + $document->setId($messageId); + + $eventTime = new Timestamp(); + $eventTime->fromDateTime($eventDateTime); + + // Prepare the request message. + $userEvent = (new UserEvent()) + ->setEventType($eventType) + ->setUserPseudoId((string) $user->users_id) + ->setDocuments([$document]) + ->setEventTime($eventTime); + + $writeUserEventRequest = (new WriteUserEventRequest()) + ->setParent($userEventServiceClient) + ->setUserEvent($userEvent); + + $client = new UserEventServiceClient([ + 'credentials' => $googleClientConfig, + ]); + + $client->writeUserEvent($writeUserEventRequest); + $this->info('Event: ' . $eventType . ' for user ' . $user->displayname . ' at ' . $eventTime->toDateTime()->format('Y-m-d H:i:s')); + } + } +} diff --git a/app/Console/Commands/Connectors/Apollo/SyncAllPeopleInCompanyCommand.php b/app/Console/Commands/Connectors/Apollo/SyncAllPeopleInCompanyCommand.php index b9d00105e..6e25023c6 100644 --- a/app/Console/Commands/Connectors/Apollo/SyncAllPeopleInCompanyCommand.php +++ b/app/Console/Commands/Connectors/Apollo/SyncAllPeopleInCompanyCommand.php @@ -5,7 +5,6 @@ namespace App\Console\Commands\Connectors\Apollo; use Baka\Traits\KanvasJobsTrait; -use Carbon\Carbon; use Illuminate\Console\Command; use Illuminate\Support\Facades\Cache; use Kanvas\Apps\Models\Apps; @@ -22,7 +21,7 @@ class SyncAllPeopleInCompanyCommand extends Command * * @var string */ - protected $signature = 'kanvas:guild-apollo-people-sync {app_id} {company_id} {total=200} {perPage=200}'; + protected $signature = 'kanvas:guild-apollo-people-sync {app_id} {company_id} {total=150} {perPage=50}'; /** * The console command description. @@ -44,100 +43,87 @@ public function handle() $this->overwriteAppService($app); $company = Companies::getById((int) $this->argument('company_id')); - $hourlyRateLimit = 400; // Maximum API calls per hour - $dailyRateLimit = 2000; // Maximum API calls per day - $batchSize = 100; // Number of people to process per batch + $hourlyRateLimit = 400; + $dailyRateLimit = 2000; + $batchSize = 100; $hourlyCacheKey = 'api_hourly_rate_limit_' . $app->getId(); $dailyCacheKey = 'api_daily_rate_limit_' . $app->getId(); $resetHourlyKey = 'api_hourly_rate_limit_reset_' . $app->getId(); $resetDailyKey = 'api_daily_rate_limit_reset_' . $app->getId(); - $hourlyTimeWindow = 60 * 60; // 1 hour in seconds - $dailyTimeWindow = 24 * 60 * 60; // 24 hours in seconds + $hourlyTimeWindow = 60 * 60; + $dailyTimeWindow = 24 * 60 * 60; - // Check the current count of API calls - $currentHourlyCount = Cache::get($hourlyCacheKey, 0); - $currentDailyCount = Cache::get($dailyCacheKey, 0); - $resetHourlyTimestamp = Cache::get($resetHourlyKey); - $resetDailyTimestamp = Cache::get($resetDailyKey); - - $this->line('Syncing ' . $currentHourlyCount . ' people in company ' . $company->name . ' from app ' . $app->name . ' total ' . $total . ' per page ' . $perPage); - - if ($resetHourlyTimestamp) { - $resetHourlyTime = Carbon::parse($resetHourlyTimestamp); - $currentTimestamp = now()->timestamp; - $hourlyWaitTime = $resetHourlyTime->timestamp - $currentTimestamp; - - if ($currentHourlyCount >= $hourlyRateLimit && $hourlyWaitTime > 0) { - $this->line("Hourly rate limit reached. Please wait $hourlyWaitTime seconds to run the process again."); - - return; - } + // Reset hourly/daily counters if time window has expired + if (now()->timestamp >= Cache::get($resetHourlyKey, 0)) { + Cache::put($hourlyCacheKey, 0, $hourlyTimeWindow); } - if ($resetDailyTimestamp) { - $resetDailyTime = Carbon::parse($resetDailyTimestamp); - $currentTimestamp = now()->timestamp; - $dailyWaitTime = $resetDailyTime->timestamp - $currentTimestamp; - - if ($currentDailyCount >= $dailyRateLimit && $dailyWaitTime > 0) { - $this->line("Daily rate limit reached. Please wait $dailyWaitTime seconds to run the process again."); - - return; - } + if (now()->timestamp >= Cache::get($resetDailyKey, 0)) { + Cache::put($dailyCacheKey, 0, $dailyTimeWindow); } - People::fromApp($app) - ->fromCompany($company) - ->notDeleted(0) - ->orderBy('peoples.id', 'DESC') - ->limit($total) - ->chunk($perPage, function ($peoples) use (&$currentHourlyCount, &$currentDailyCount, $hourlyRateLimit, $dailyRateLimit, $hourlyCacheKey, $dailyCacheKey, $resetHourlyKey, $resetDailyKey, $hourlyTimeWindow, $dailyTimeWindow) { - foreach ($peoples as $people) { - // Check if the person has the 'APOLLO_DATA_ENRICHMENT_CUSTOM_FIELDS' custom field - $hasCustomField = $people->get(ConfigurationEnum::APOLLO_DATA_ENRICHMENT_CUSTOM_FIELDS->value); - - if ($hasCustomField) { - // Skip this record if the custom field exists - continue; - } - // Process the record if the custom field does not exist - if ($currentHourlyCount >= $hourlyRateLimit) { - Cache::put($resetHourlyKey, now()->addSeconds($hourlyTimeWindow), $hourlyTimeWindow); - $this->line("Hourly rate limit reached. Please wait $hourlyTimeWindow seconds to run the process again."); - - return false; - } + $currentHourlyCount = Cache::get($hourlyCacheKey, 0); + $currentDailyCount = Cache::get($dailyCacheKey, 0); - if ($currentDailyCount >= $dailyRateLimit) { - Cache::put($resetDailyKey, now()->addSeconds($dailyTimeWindow), $dailyTimeWindow); - $this->line("Daily rate limit reached. Please wait $dailyTimeWindow seconds to run the process again."); + $this->line("Syncing people for company {$company->name} from app {$app->name}, total {$total}, per page {$perPage}"); - return false; + People::fromApp($app) + ->fromCompany($company) + ->notDeleted(0) + ->orderBy('peoples.id', 'DESC') + ->limit($total) + ->chunk($perPage, function ($peoples) use (&$currentHourlyCount, &$currentDailyCount, $hourlyRateLimit, $dailyRateLimit, $hourlyCacheKey, $dailyCacheKey, $resetHourlyKey, $resetDailyKey, $hourlyTimeWindow, $dailyTimeWindow) { + foreach ($peoples as $people) { + $hasCustomField = $people->get(ConfigurationEnum::APOLLO_DATA_ENRICHMENT_CUSTOM_FIELDS->value); + if ($hasCustomField) { + continue; + } + + if ($currentHourlyCount >= $hourlyRateLimit) { + Cache::put($resetHourlyKey, now()->addSeconds($hourlyTimeWindow), $hourlyTimeWindow); + $this->line('Hourly rate limit reached. Waiting for reset...'); + sleep($hourlyTimeWindow); + + continue; + } + + if ($currentDailyCount >= $dailyRateLimit) { + Cache::put($resetDailyKey, now()->addSeconds($dailyTimeWindow), $dailyTimeWindow); + $this->line('Daily rate limit reached. Waiting for reset...'); + sleep($dailyTimeWindow); + + continue; + } + + $this->line("Syncing people {$people->id}: {$people->firstname} {$people->lastname}"); + + $people->fireWorkflow( + WorkflowEnum::UPDATED->value, + true, + ['app' => $people->app] + ); + + $currentHourlyCount++; + $currentDailyCount++; + + Cache::put($hourlyCacheKey, $currentHourlyCount, $hourlyTimeWindow); + Cache::put($dailyCacheKey, $currentDailyCount, $dailyTimeWindow); + + // Dynamic delay based on remaining rate limit + $delay = $this->calculateDelay($currentHourlyCount, $hourlyRateLimit); + sleep($delay); } + }); - $this->line('Syncing people ' . $people->id . ' ' . $people->firstname . ' ' . $people->lastname); - - $people->fireWorkflow( - WorkflowEnum::UPDATED->value, - true, - [ - 'app' => $people->app, - ] - ); - //$people->clearLightHouseCacheJob(); - - // Increment the count and update cache for rate limiting - $currentHourlyCount++; - $currentDailyCount++; - Cache::put($hourlyCacheKey, $currentHourlyCount, $hourlyTimeWindow); - Cache::put($dailyCacheKey, $currentDailyCount, $dailyTimeWindow); - - usleep(100000); // 100ms delay between each request - } - }); + $this->line("All people for company {$company->name} from app {$app->name} synced"); + } - $this->line('All people in company ' . $company->name . ' from app ' . $app->name . ' synced'); + private function calculateDelay(int $currentCount, int $rateLimit): int + { + // Adjust delay dynamically to distribute requests evenly + $remainingRequests = $rateLimit - $currentCount; + $remainingTime = 60 * 60; // 1 hour in seconds - return; + return $remainingRequests > 0 ? intdiv($remainingTime, $remainingRequests) : 2; } } From fcf35bdffafc2b72616a4a70dd12c33ad44966bd Mon Sep 17 00:00:00 2001 From: kaioken Date: Sun, 20 Oct 2024 23:39:26 -0400 Subject: [PATCH 3/3] feat: add amplitude sync command --- .../Amplitude/SyncEventsWithGoogleCommand.php | 149 ++++++++++-------- 1 file changed, 80 insertions(+), 69 deletions(-) diff --git a/app/Console/Commands/Connectors/Amplitude/SyncEventsWithGoogleCommand.php b/app/Console/Commands/Connectors/Amplitude/SyncEventsWithGoogleCommand.php index 486cc62f6..52c99a309 100644 --- a/app/Console/Commands/Connectors/Amplitude/SyncEventsWithGoogleCommand.php +++ b/app/Console/Commands/Connectors/Amplitude/SyncEventsWithGoogleCommand.php @@ -5,8 +5,9 @@ namespace App\Console\Commands\Connectors\Amplitude; use Baka\Traits\KanvasJobsTrait; +use DateInterval; +use DatePeriod; use DateTime; -use Google\Cloud\DiscoveryEngine\V1\Client\RecommendationServiceClient; use Google\Cloud\DiscoveryEngine\V1\Client\UserEventServiceClient; use Google\Cloud\DiscoveryEngine\V1\DocumentInfo; use Google\Cloud\DiscoveryEngine\V1\UserEvent; @@ -18,12 +19,12 @@ use Kanvas\Connectors\Amplitude\Client; use Kanvas\Connectors\Google\Enums\ConfigurationEnum; use Kanvas\Exceptions\ValidationException; -use Kanvas\Social\Messages\Models\Message; use Kanvas\Users\Models\UsersAssociatedApps; class SyncEventsWithGoogleCommand extends Command { use KanvasJobsTrait; + /** * The name and signature of the console command. * @@ -47,86 +48,96 @@ public function handle() { $app = Apps::getById((int) $this->argument('app_id')); $company = Companies::getById((int) $this->argument('company_id')); - $amplitudeClient = new Client($app, $company); - $this->info('Syncing events from Amplitude to Google Dynamic Recommendation for app ' . $app->name . ' during ' . $this->argument('start_date') . ' to ' . $this->argument('end_date')); + $startDate = new DateTime($this->argument('start_date')); + $endDate = new DateTime($this->argument('end_date')); + + // Create a DatePeriod object to loop through each day + $interval = new DateInterval('P1D'); // 1 day interval + $dateRange = new DatePeriod($startDate, $interval, $endDate->add($interval)); - $results = $amplitudeClient->eventsExport($this->argument('start_date'), $this->argument('end_date')); $googleClientConfig = $app->get(ConfigurationEnum::GOOGLE_CLIENT_CONFIG->value); $googleRecommendationConfig = $app->get(ConfigurationEnum::GOOGLE_RECOMMENDATION_CONFIG->value); if (! $googleClientConfig) { - throw new ValidationException('Google client config not found for app ' . $$app->name); + throw new ValidationException('Google client config not found for app ' . $app->name); } if (! $googleRecommendationConfig) { - throw new ValidationException('Google recommendation config not found for app ' . $$app->name); + throw new ValidationException('Google recommendation config not found for app ' . $app->name); } - $googleClientConfig = $googleClientConfig; - $googleRecommendationConfig = $googleRecommendationConfig; - $client = new RecommendationServiceClient([ - 'credentials' => $googleClientConfig, - ]); - $userEventServiceClient = UserEventServiceClient::dataStoreName( - $googleRecommendationConfig['projectId'], - $googleRecommendationConfig['location'], - $googleRecommendationConfig['dataSource'], - ); - - foreach ($results as $result) { - $eventDateTime = new DateTime($result['client_event_time']); - $eventType = $result['event_type'] ?? null; - - $eventTypeWeb = $result['event_properties']['[Amplitude] Page Path'] ?? null; - $displayname = $result['event_properties']['username'] ?? null; - $messageId = $result['event_properties']['prompt_id'] ?? 0; - - $eventType = match (trim($eventType)) { - 'View Explore' => 'view-home-page', - 'Page Viewed' => 'view-item', - 'View Library' => 'view-home-page', - 'Select Prompt' => 'view-item', - '[Amplitude] Page Viewed' => 'view-item', - default => null, - }; - - if (! $eventType) { - continue; + foreach ($dateRange as $date) { + $currentDate = $date->format('Y-m-d'); + $this->info("Syncing events for {$currentDate}"); + + // Fetch events from Amplitude for the current day + $results = $amplitudeClient->eventsExport($currentDate, $currentDate); + + foreach ($results as $result) { + $eventDateTime = new DateTime($result['client_event_time']); + $eventType = $result['event_type'] ?? null; + + $eventTypeWeb = $result['event_properties']['[Amplitude] Page Path'] ?? null; + $displayname = $result['event_properties']['username'] ?? null; + $messageId = $result['event_properties']['prompt_id'] ?? 0; + + $eventType = match (trim($eventType)) { + 'View Explore' => 'view-home-page', + 'Page Viewed' => 'view-item', + 'View Library' => 'view-home-page', + 'Select Prompt' => 'view-item', + '[Amplitude] Page Viewed' => 'view-item', + default => null, + }; + + if (! $eventType) { + continue; + } + + $user = UsersAssociatedApps::fromApp($app)->where('displayname', $displayname)->first(); + + if (! $user) { + // Use default user if no user found + $user = UsersAssociatedApps::fromApp($app)->where('users_id', $app->get('default_user_recommendation_catchall_id'))->first(); + } + + if (! $user) { + continue; + } + + $document = new DocumentInfo(); + $document->setId($messageId); + + $eventTime = new Timestamp(); + $eventTime->fromDateTime($eventDateTime); + + // Prepare the request message + $userEvent = (new UserEvent()) + ->setEventType($eventType) + ->setUserPseudoId((string) $user->users_id) + ->setDocuments([$document]) + ->setEventTime($eventTime); + + $writeUserEventRequest = (new WriteUserEventRequest()) + ->setParent(UserEventServiceClient::dataStoreName( + $googleRecommendationConfig['projectId'], + $googleRecommendationConfig['location'], + $googleRecommendationConfig['dataSource'] + )) + ->setUserEvent($userEvent); + + $googleClient = new UserEventServiceClient([ + 'credentials' => $googleClientConfig, + ]); + + // Send event to Google + $googleClient->writeUserEvent($writeUserEventRequest); + $this->info('Event: ' . $eventType . ' for user ' . $user->displayname . ' at ' . $eventTime->toDateTime()->format('Y-m-d H:i:s')); } - - $user = UsersAssociatedApps::fromApp($app)->where('displayname', $displayname)->first(); - - if (! $user) { - continue; - } else { - $user = UsersAssociatedApps::fromApp($app)->where('users_id', $app->get('default_user_recommendation_catchall_id'))->first(); - } - - $document = new DocumentInfo(); - $document->setId($messageId); - - $eventTime = new Timestamp(); - $eventTime->fromDateTime($eventDateTime); - - // Prepare the request message. - $userEvent = (new UserEvent()) - ->setEventType($eventType) - ->setUserPseudoId((string) $user->users_id) - ->setDocuments([$document]) - ->setEventTime($eventTime); - - $writeUserEventRequest = (new WriteUserEventRequest()) - ->setParent($userEventServiceClient) - ->setUserEvent($userEvent); - - $client = new UserEventServiceClient([ - 'credentials' => $googleClientConfig, - ]); - - $client->writeUserEvent($writeUserEventRequest); - $this->info('Event: ' . $eventType . ' for user ' . $user->displayname . ' at ' . $eventTime->toDateTime()->format('Y-m-d H:i:s')); } + + $this->info('All events synced.'); } }