Commit 5e0e469d authored by Sebastian Kummer's avatar Sebastian Kummer

Merge branch 'feature/ZP-818-send-no-answer-collection-for-unchanged' into...

Merge branch 'feature/ZP-818-send-no-answer-collection-for-unchanged' into feature/ZP-832-detect-folder-changes-faster
parents f9e71d7f 5179ec79
...@@ -45,6 +45,9 @@ class Sync extends RequestProcessor { ...@@ -45,6 +45,9 @@ class Sync extends RequestProcessor {
// Ignored SMS identifier // Ignored SMS identifier
const ZPUSHIGNORESMS = "ZPISMS"; const ZPUSHIGNORESMS = "ZPISMS";
private $importer; private $importer;
private $globallyExportedItems;
private $startTagsSent = false;
private $startFolderTagSent = false;
/** /**
* Handles the Sync command * Handles the Sync command
...@@ -61,7 +64,7 @@ class Sync extends RequestProcessor { ...@@ -61,7 +64,7 @@ class Sync extends RequestProcessor {
$status = SYNC_STATUS_SUCCESS; $status = SYNC_STATUS_SUCCESS;
$wbxmlproblem = false; $wbxmlproblem = false;
$emptysync = false; $emptysync = false;
$globallyExportedItems = 0; $this->globallyExportedItems = 0;
// check if the hierarchySync was fully completed // check if the hierarchySync was fully completed
...@@ -662,378 +665,429 @@ class Sync extends RequestProcessor { ...@@ -662,378 +665,429 @@ class Sync extends RequestProcessor {
} }
} }
ZLog::Write(LOGLEVEL_DEBUG, sprintf("HandleSync(): Start Output"));
// Start the output // Start the output
self::$encoder->startWBXML(); ZLog::Write(LOGLEVEL_DEBUG, "HandleSync(): Start Output");
self::$encoder->startTag(SYNC_SYNCHRONIZE);
{ // global status
// global status // SYNC_COMMONSTATUS_* start with values from 101
// SYNC_COMMONSTATUS_* start with values from 101 if ($status != SYNC_COMMONSTATUS_SUCCESS && $status > 100) {
if ($status != SYNC_COMMONSTATUS_SUCCESS && $status > 100) { $this->sendStartTags();
self::$encoder->startTag(SYNC_STATUS); self::$encoder->startTag(SYNC_STATUS);
self::$encoder->content($status); self::$encoder->content($status);
self::$encoder->endTag(); self::$encoder->endTag();
} return true;
else { }
self::$encoder->startTag(SYNC_FOLDERS);
{
foreach($sc as $folderid => $spa) {
// get actiondata
$actiondata = $sc->GetParameter($spa, "actiondata");
if ($status == SYNC_STATUS_SUCCESS && (!$spa->GetContentClass() || !$spa->GetFolderId())) {
ZLog::Write(LOGLEVEL_ERROR, sprintf("HandleSync(): no content class or folderid found for collection."));
continue;
}
if (! $sc->GetParameter($spa, "requested"))
ZLog::Write(LOGLEVEL_DEBUG, sprintf("HandleSync(): partial sync for folder class '%s' with id '%s'", $spa->GetContentClass(), $spa->GetFolderId()));
// initialize exporter to get changecount // Loop through requested folders
$changecount = false; foreach($sc as $folderid => $spa) {
if (isset($exporter)) // get actiondata
unset($exporter); $actiondata = $sc->GetParameter($spa, "actiondata");
// TODO we could check against $sc->GetChangedFolderIds() on heartbeat so we do not need to configure all exporter again if ($status == SYNC_STATUS_SUCCESS && (!$spa->GetContentClass() || !$spa->GetFolderId())) {
if($status == SYNC_STATUS_SUCCESS && ($sc->GetParameter($spa, "getchanges") || ! $spa->HasSyncKey())) { ZLog::Write(LOGLEVEL_ERROR, sprintf("HandleSync(): no content class or folderid found for collection."));
continue;
}
//make sure the states are loaded if (! $sc->GetParameter($spa, "requested"))
$status = $this->loadStates($sc, $spa, $actiondata); ZLog::Write(LOGLEVEL_DEBUG, sprintf("HandleSync(): partial sync for folder class '%s' with id '%s'", $spa->GetContentClass(), $spa->GetFolderId()));
if($status == SYNC_STATUS_SUCCESS) { // initialize exporter to get changecount
try { $changecount = false;
// if this is an additional folder the backend has to be setup correctly $exporter = false;
if (!self::$backend->Setup(ZPush::GetAdditionalSyncFolderStore($spa->GetFolderId()))) $streamimporter = false;
throw new StatusException(sprintf("HandleSync() could not Setup() the backend for folder id '%s'", $spa->GetFolderId()), SYNC_STATUS_FOLDERHIERARCHYCHANGED);
// Use the state from the importer, as changes may have already happened // TODO we could check against $sc->GetChangedFolderIds() on heartbeat so we do not need to configure all exporter again
$exporter = self::$backend->GetExporter($spa->GetFolderId()); if($status == SYNC_STATUS_SUCCESS && ($sc->GetParameter($spa, "getchanges") || ! $spa->HasSyncKey())) {
if ($exporter === false) //make sure the states are loaded
throw new StatusException(sprintf("HandleSync() could not get an exporter for folder id '%s'", $spa->GetFolderId()), SYNC_STATUS_FOLDERHIERARCHYCHANGED); $status = $this->loadStates($sc, $spa, $actiondata);
}
catch (StatusException $stex) {
$status = $stex->getCode();
}
try {
// Stream the messages directly to the PDA
$streamimporter = new ImportChangesStream(self::$encoder, ZPush::getSyncObjectFromFolderClass($spa->GetContentClass()));
if ($exporter !== false) { if($status == SYNC_STATUS_SUCCESS) {
$exporter->Config($sc->GetParameter($spa, "state")); try {
$exporter->ConfigContentParameters($spa->GetCPO()); // if this is an additional folder the backend has to be setup correctly
$exporter->InitializeExporter($streamimporter); if (!self::$backend->Setup(ZPush::GetAdditionalSyncFolderStore($spa->GetFolderId())))
throw new StatusException(sprintf("HandleSync() could not Setup() the backend for folder id '%s'", $spa->GetFolderId()), SYNC_STATUS_FOLDERHIERARCHYCHANGED);
$changecount = $exporter->GetChangeCount(); // Use the state from the importer, as changes may have already happened
} $exporter = self::$backend->GetExporter($spa->GetFolderId());
}
catch (StatusException $stex) {
if ($stex->getCode() === SYNC_FSSTATUS_CODEUNKNOWN && $spa->HasSyncKey())
$status = SYNC_STATUS_INVALIDSYNCKEY;
else
$status = $stex->getCode();
}
if (! $spa->HasSyncKey()) { if ($exporter === false)
self::$topCollector->AnnounceInformation(sprintf("Exporter registered. %d objects queued.", $changecount), true); throw new StatusException(sprintf("HandleSync() could not get an exporter for folder id '%s'", $spa->GetFolderId()), SYNC_STATUS_FOLDERHIERARCHYCHANGED);
// update folder status as initialized }
$spa->SetFolderSyncTotal($changecount); catch (StatusException $stex) {
$spa->SetFolderSyncRemaining($changecount); $status = $stex->getCode();
if ($changecount > 0) { }
self::$deviceManager->SetFolderSyncStatus($folderid, DeviceManager::FLD_SYNC_INITIALIZED); try {
} // Stream the messages directly to the PDA
} $streamimporter = new ImportChangesStream(self::$encoder, ZPush::getSyncObjectFromFolderClass($spa->GetContentClass()));
else if ($status != SYNC_STATUS_SUCCESS)
self::$topCollector->AnnounceInformation(sprintf("StatusException code: %d", $status), true);
} if ($exporter !== false) {
} $exporter->Config($sc->GetParameter($spa, "state"));
$exporter->ConfigContentParameters($spa->GetCPO());
$exporter->InitializeExporter($streamimporter);
if (isset($hbinterval) && $changecount == 0 && $status == SYNC_STATUS_SUCCESS) { $changecount = $exporter->GetChangeCount();
ZLog::Write(LOGLEVEL_DEBUG, "No changes found for heartbeat folder. Omitting empty output.");
continue;
} }
}
catch (StatusException $stex) {
if ($stex->getCode() === SYNC_FSSTATUS_CODEUNKNOWN && $spa->HasSyncKey())
$status = SYNC_STATUS_INVALIDSYNCKEY;
else
$status = $stex->getCode();
}
// Get a new sync key to output to the client if any changes have been send by the mobile or a new synckey is to be sent if (! $spa->HasSyncKey()) {
if (!empty($actiondata["modifyids"]) || self::$topCollector->AnnounceInformation(sprintf("Exporter registered. %d objects queued.", $changecount), true);
!empty($actiondata["clientids"]) || // update folder status as initialized
!empty($actiondata["removeids"]) || $spa->SetFolderSyncTotal($changecount);
(! $spa->HasSyncKey() && $status == SYNC_STATUS_SUCCESS)) { $spa->SetFolderSyncRemaining($changecount);
$spa->SetNewSyncKey(self::$deviceManager->GetStateManager()->GetNewSyncKey($spa->GetSyncKey())); if ($changecount > 0) {
self::$deviceManager->SetFolderSyncStatus($folderid, DeviceManager::FLD_SYNC_INITIALIZED);
} }
// get a new synckey only if we did not reach the global limit yet }
else { else if ($status != SYNC_STATUS_SUCCESS)
// when reaching the global limit for changes of all collections, stop processing other collections (ZP-697) self::$topCollector->AnnounceInformation(sprintf("StatusException code: %d", $status), true);
if ($sc->GetGlobalWindowSize() <= $globallyExportedItems) {
ZLog::Write(LOGLEVEL_DEBUG, "Global WindowSize for amount of exported changes reached, omitting output for collection.");
continue;
}
// get a new synckey if there are changes are we did not reach the limit yet }
if ($changecount > 0) { }
$spa->SetNewSyncKey(self::$deviceManager->GetStateManager()->GetNewSyncKey($spa->GetSyncKey()));
}
}
self::$encoder->startTag(SYNC_FOLDER); // Get a new sync key to output to the client if any changes have been send by the mobile or a new synckey is to be sent
if (!empty($actiondata["modifyids"]) ||
!empty($actiondata["clientids"]) ||
!empty($actiondata["removeids"]) ||
(! $spa->HasSyncKey() && $status == SYNC_STATUS_SUCCESS)) {
$spa->SetNewSyncKey(self::$deviceManager->GetStateManager()->GetNewSyncKey($spa->GetSyncKey()));
}
// get a new synckey only if we did not reach the global limit yet
else {
// when reaching the global limit for changes of all collections, stop processing other collections (ZP-697)
if ($sc->GetGlobalWindowSize() <= $this->globallyExportedItems) {
ZLog::Write(LOGLEVEL_DEBUG, "Global WindowSize for amount of exported changes reached, omitting output for collection.");
continue;
}
if($spa->HasContentClass()) { // get a new synckey if there are changes are we did not reach the limit yet
ZLog::Write(LOGLEVEL_DEBUG, sprintf("Folder type: %s", $spa->GetContentClass())); if ($changecount > 0) {
// AS 12.0 devices require content class $spa->SetNewSyncKey(self::$deviceManager->GetStateManager()->GetNewSyncKey($spa->GetSyncKey()));
if (Request::GetProtocolVersion() < 12.1) { }
self::$encoder->startTag(SYNC_FOLDERTYPE); }
self::$encoder->content($spa->GetContentClass());
self::$encoder->endTag();
}
}
self::$encoder->startTag(SYNC_SYNCKEY); // Fir AS 14.0+ omit output for folder, if there were no incoming or outgoing changes and no Fetch
if($status == SYNC_STATUS_SUCCESS && $spa->HasNewSyncKey()) if (Request::GetProtocolVersion() >= 14.0 && ! $spa->HasNewSyncKey() && $changecount == 0 && empty($actiondata["fetchids"]) && $status == SYNC_STATUS_SUCCESS) {
self::$encoder->content($spa->GetNewSyncKey()); ZLog::Write(LOGLEVEL_DEBUG, sprintf("HandleSync: No changes found for %s folder id '%s'. Omitting output.", $spa->GetContentClass(), $spa->GetFolderId()));
else continue;
self::$encoder->content($spa->GetSyncKey()); }
self::$encoder->endTag();
self::$encoder->startTag(SYNC_FOLDERID);
self::$encoder->content($spa->GetFolderId());
self::$encoder->endTag();
self::$encoder->startTag(SYNC_STATUS);
self::$encoder->content($status);
self::$encoder->endTag();
// announce failing status to the process loop detection
if ($status !== SYNC_STATUS_SUCCESS)
self::$deviceManager->AnnounceProcessStatus($spa->GetFolderId(), $status);
// Output IDs and status for incoming items & requests
if($status == SYNC_STATUS_SUCCESS && (
!empty($actiondata["clientids"]) ||
!empty($actiondata["modifyids"]) ||
!empty($actiondata["removeids"]) ||
!empty($actiondata["fetchids"]) )) {
self::$encoder->startTag(SYNC_REPLIES);
// output result of all new incoming items
foreach($actiondata["clientids"] as $clientid => $serverid) {
self::$encoder->startTag(SYNC_ADD);
self::$encoder->startTag(SYNC_CLIENTENTRYID);
self::$encoder->content($clientid);
self::$encoder->endTag();
if ($serverid) {
self::$encoder->startTag(SYNC_SERVERENTRYID);
self::$encoder->content($serverid);
self::$encoder->endTag();
}
self::$encoder->startTag(SYNC_STATUS);
self::$encoder->content((isset($actiondata["statusids"][$clientid])?$actiondata["statusids"][$clientid]:SYNC_STATUS_CLIENTSERVERCONVERSATIONERROR));
self::$encoder->endTag();
self::$encoder->endTag();
}
// loop through modify operations which were not a success, send status // there is something to send here, sync folder to output
foreach($actiondata["modifyids"] as $serverid) { $this->syncFolder($sc, $spa, $exporter, $changecount, $streamimporter, $status);
if (isset($actiondata["statusids"][$serverid]) && $actiondata["statusids"][$serverid] !== SYNC_STATUS_SUCCESS) {
self::$encoder->startTag(SYNC_MODIFY);
self::$encoder->startTag(SYNC_SERVERENTRYID);
self::$encoder->content($serverid);
self::$encoder->endTag();
self::$encoder->startTag(SYNC_STATUS);
self::$encoder->content($actiondata["statusids"][$serverid]);
self::$encoder->endTag();
self::$encoder->endTag();
}
}
// loop through remove operations which were not a success, send status // reset status for the next folder
foreach($actiondata["removeids"] as $serverid) { $status = SYNC_STATUS_SUCCESS;
if (isset($actiondata["statusids"][$serverid]) && $actiondata["statusids"][$serverid] !== SYNC_STATUS_SUCCESS) { } // END foreach collection
self::$encoder->startTag(SYNC_REMOVE);
self::$encoder->startTag(SYNC_SERVERENTRYID);
self::$encoder->content($serverid);
self::$encoder->endTag();
self::$encoder->startTag(SYNC_STATUS);
self::$encoder->content($actiondata["statusids"][$serverid]);
self::$encoder->endTag();
self::$encoder->endTag();
}
}
if (!empty($actiondata["fetchids"])) //SYNC_FOLDERS - only if the starttag was sent
self::$topCollector->AnnounceInformation(sprintf("Fetching %d objects ", count($actiondata["fetchids"])), true); if ($this->startFolderTagSent)
self::$encoder->endTag();
foreach($actiondata["fetchids"] as $id) { //SYNC_SYNCHRONIZE - only if the starttag was sent
$data = false; if ($this->startTagsSent)
try { self::$encoder->endTag();
$fetchstatus = SYNC_STATUS_SUCCESS;
// if this is an additional folder the backend has to be setup correctly return true;
if (!self::$backend->Setup(ZPush::GetAdditionalSyncFolderStore($spa->GetFolderId()))) }
throw new StatusException(sprintf("HandleSync(): could not Setup() the backend to fetch in folder id '%s'", $spa->GetFolderId()), SYNC_STATUS_OBJECTNOTFOUND);
$data = self::$backend->Fetch($spa->GetFolderId(), $id, $spa->GetCPO()); /**
* Sends the SYNC_SYNCHRONIZE once per request.
*
* @access private
* @return void
*/
private function sendStartTags() {
if ($this->startTagsSent === false) {
self::$encoder->startWBXML();
self::$encoder->startTag(SYNC_SYNCHRONIZE);
$this->startTagsSent = true;
}
}
// check if the message is broken /**
if (ZPush::GetDeviceManager(false) && ZPush::GetDeviceManager()->DoNotStreamMessage($id, $data)) { * Sends the SYNC_FOLDERS once per request.
ZLog::Write(LOGLEVEL_DEBUG, sprintf("HandleSync(): message not to be streamed as requested by DeviceManager, id = %s", $id)); *
$fetchstatus = SYNC_STATUS_CLIENTSERVERCONVERSATIONERROR; * @access private
} * @return void
} */
catch (StatusException $stex) { private function sendFolderStartTag() {
$fetchstatus = $stex->getCode(); $this->sendStartTags();
} if ($this->startFolderTagSent === false) {
self::$encoder->startTag(SYNC_FOLDERS);
$this->startFolderTagSent = true;
}
}
self::$encoder->startTag(SYNC_FETCH); /**
self::$encoder->startTag(SYNC_SERVERENTRYID); * Synchronizes a folder to the output stream. Changes for this folders are expected.
self::$encoder->content($id); *
self::$encoder->endTag(); * @param SyncCollections $sc
* @param SyncParameters $spa
* @param IExportChanges $exporter Fully configured exporter for this folder
* @param int $changecount Amount of changes expected
* @param ImportChangesStream $streamimporter Output stream
* @param int $status current status of the folder processing
* @throws StatusException
* @return int sync status code
*/
private function syncFolder($sc, $spa, $exporter, $changecount, $streamimporter, $status) {
$actiondata = $sc->GetParameter($spa, "actiondata");
// send the WBXML start tags (if not happened already)
$this->sendFolderStartTag();
self::$encoder->startTag(SYNC_FOLDER);
if($spa->HasContentClass()) {
ZLog::Write(LOGLEVEL_DEBUG, sprintf("Folder type: %s", $spa->GetContentClass()));
// AS 12.0 devices require content class
if (Request::GetProtocolVersion() < 12.1) {
self::$encoder->startTag(SYNC_FOLDERTYPE);
self::$encoder->content($spa->GetContentClass());
self::$encoder->endTag();
}
}
self::$encoder->startTag(SYNC_STATUS); self::$encoder->startTag(SYNC_SYNCKEY);
self::$encoder->content($fetchstatus); if($status == SYNC_STATUS_SUCCESS && $spa->HasNewSyncKey())
self::$encoder->endTag(); self::$encoder->content($spa->GetNewSyncKey());
else
self::$encoder->content($spa->GetSyncKey());
self::$encoder->endTag();
self::$encoder->startTag(SYNC_FOLDERID);
self::$encoder->content($spa->GetFolderId());
self::$encoder->endTag();
self::$encoder->startTag(SYNC_STATUS);
self::$encoder->content($status);
self::$encoder->endTag();
// announce failing status to the process loop detection
if ($status !== SYNC_STATUS_SUCCESS)
self::$deviceManager->AnnounceProcessStatus($spa->GetFolderId(), $status);
// Output IDs and status for incoming items & requests
if($status == SYNC_STATUS_SUCCESS && (
!empty($actiondata["clientids"]) ||
!empty($actiondata["modifyids"]) ||
!empty($actiondata["removeids"]) ||
!empty($actiondata["fetchids"]) )) {
self::$encoder->startTag(SYNC_REPLIES);
// output result of all new incoming items
foreach($actiondata["clientids"] as $clientid => $serverid) {
self::$encoder->startTag(SYNC_ADD);
self::$encoder->startTag(SYNC_CLIENTENTRYID);
self::$encoder->content($clientid);
self::$encoder->endTag();
if ($serverid) {
self::$encoder->startTag(SYNC_SERVERENTRYID);
self::$encoder->content($serverid);
self::$encoder->endTag();
}
self::$encoder->startTag(SYNC_STATUS);
self::$encoder->content((isset($actiondata["statusids"][$clientid])?$actiondata["statusids"][$clientid]:SYNC_STATUS_CLIENTSERVERCONVERSATIONERROR));
self::$encoder->endTag();
self::$encoder->endTag();
}
if($data !== false && $status == SYNC_STATUS_SUCCESS) { // loop through modify operations which were not a success, send status
self::$encoder->startTag(SYNC_DATA); foreach($actiondata["modifyids"] as $serverid) {
$data->Encode(self::$encoder); if (isset($actiondata["statusids"][$serverid]) && $actiondata["statusids"][$serverid] !== SYNC_STATUS_SUCCESS) {
self::$encoder->endTag(); self::$encoder->startTag(SYNC_MODIFY);
} self::$encoder->startTag(SYNC_SERVERENTRYID);
else self::$encoder->content($serverid);
ZLog::Write(LOGLEVEL_WARN, sprintf("Unable to Fetch '%s'", $id)); self::$encoder->endTag();
self::$encoder->endTag(); self::$encoder->startTag(SYNC_STATUS);
self::$encoder->content($actiondata["statusids"][$serverid]);
self::$encoder->endTag();
self::$encoder->endTag();
}
}
} // loop through remove operations which were not a success, send status
self::$encoder->endTag(); foreach($actiondata["removeids"] as $serverid) {
} if (isset($actiondata["statusids"][$serverid]) && $actiondata["statusids"][$serverid] !== SYNC_STATUS_SUCCESS) {
self::$encoder->startTag(SYNC_REMOVE);
self::$encoder->startTag(SYNC_SERVERENTRYID);
self::$encoder->content($serverid);
self::$encoder->endTag();
self::$encoder->startTag(SYNC_STATUS);
self::$encoder->content($actiondata["statusids"][$serverid]);
self::$encoder->endTag();
self::$encoder->endTag();
}
}
if($sc->GetParameter($spa, "getchanges") && $spa->HasFolderId() && $spa->HasContentClass() && $spa->HasSyncKey()) { if (!empty($actiondata["fetchids"]))
$windowSize = self::$deviceManager->GetWindowSize($spa->GetFolderId(), $spa->GetContentClass(), $spa->GetUuid(), $spa->GetUuidCounter(), $changecount); self::$topCollector->AnnounceInformation(sprintf("Fetching %d objects ", count($actiondata["fetchids"])), true);
// limit windowSize to the max available limit of the global window size left foreach($actiondata["fetchids"] as $id) {
$globallyAvailable = $sc->GetGlobalWindowSize() - $globallyExportedItems; $data = false;
if ($changecount > $globallyAvailable && $windowSize > $globallyAvailable) { try {
ZLog::Write(LOGLEVEL_DEBUG, sprintf("HandleSync(): Limit window size to %d as the global window size limit will be reached", $globallyAvailable)); $fetchstatus = SYNC_STATUS_SUCCESS;
$windowSize = $globallyAvailable;
}
// send <MoreAvailable/> if there are more changes than fit in the folder windowsize
if($changecount > $windowSize) {
self::$encoder->startTag(SYNC_MOREAVAILABLE, false, true);
}
}
// Stream outgoing changes // if this is an additional folder the backend has to be setup correctly
if($status == SYNC_STATUS_SUCCESS && $sc->GetParameter($spa, "getchanges") == true && $windowSize > 0) { if (!self::$backend->Setup(ZPush::GetAdditionalSyncFolderStore($spa->GetFolderId())))
self::$topCollector->AnnounceInformation(sprintf("Streaming data of %d objects", (($changecount > $windowSize)?$windowSize:$changecount))); throw new StatusException(sprintf("HandleSync(): could not Setup() the backend to fetch in folder id '%s'", $spa->GetFolderId()), SYNC_STATUS_OBJECTNOTFOUND);
// Output message changes per folder
self::$encoder->startTag(SYNC_PERFORM);
$n = 0;
WBXMLDecoder::ResetInWhile("syncSynchronize");
while(WBXMLDecoder::InWhile("syncSynchronize")) {
try {
$progress = $exporter->Synchronize();
if(!is_array($progress))
break;
$n++;
if ($n % 10 == 0)
self::$topCollector->AnnounceInformation(sprintf("Streamed data of %d objects out of %d", $n, (($changecount > $windowSize)?$windowSize:$changecount)));
}
catch (SyncObjectBrokenException $mbe) {
$brokenSO = $mbe->GetSyncObject();
if (!$brokenSO) {
ZLog::Write(LOGLEVEL_ERROR, sprintf("HandleSync(): Catched SyncObjectBrokenException but broken SyncObject not available. This should be fixed in the backend."));
}
else {
if (!isset($brokenSO->id)) {
$brokenSO->id = "Unknown ID";
ZLog::Write(LOGLEVEL_ERROR, sprintf("HandleSync(): Catched SyncObjectBrokenException but no ID of object set. This should be fixed in the backend."));
}
self::$deviceManager->AnnounceIgnoredMessage($spa->GetFolderId(), $brokenSO->id, $brokenSO);
}
}
// something really bad happened while exporting changes
catch (StatusException $stex) {
$status = $stex->getCode();
// during export we found out that the states should be thrown away (ZP-623)
if ($status == SYNC_STATUS_INVALIDSYNCKEY) {
self::$deviceManager->ForceFolderResync($spa->GetFolderId());
break;
}
}
if($n >= $windowSize) { $data = self::$backend->Fetch($spa->GetFolderId(), $id, $spa->GetCPO());
ZLog::Write(LOGLEVEL_DEBUG, sprintf("HandleSync(): Exported maxItems of messages: %d / %d", $n, $changecount));
break;
}
}
// $progress is not an array when exporting the last message // check if the message is broken
// so we get the number to display from the streamimporter if (ZPush::GetDeviceManager(false) && ZPush::GetDeviceManager()->DoNotStreamMessage($id, $data)) {
if (isset($streamimporter)) { ZLog::Write(LOGLEVEL_DEBUG, sprintf("HandleSync(): message not to be streamed as requested by DeviceManager, id = %s", $id));
$n = $streamimporter->GetImportedMessages(); $fetchstatus = SYNC_STATUS_CLIENTSERVERCONVERSATIONERROR;
} }
}
catch (StatusException $stex) {
$fetchstatus = $stex->getCode();
}
self::$encoder->endTag(); self::$encoder->startTag(SYNC_FETCH);
self::$topCollector->AnnounceInformation(sprintf("Outgoing %d objects%s", $n, ($n >= $windowSize)?" of ".$changecount:""), true); self::$encoder->startTag(SYNC_SERVERENTRYID);
$globallyExportedItems += $n; self::$encoder->content($id);
self::$encoder->endTag();
// update folder status, if there is something set self::$encoder->startTag(SYNC_STATUS);
if ($spa->GetFolderSyncRemaining() && $changecount > 0) { self::$encoder->content($fetchstatus);
$spa->SetFolderSyncRemaining($changecount); self::$encoder->endTag();
}
// changecount is initialized with 'false', so 0 means no changes!
if ($changecount === 0 || ($changecount !== false && $changecount <= $windowSize))
self::$deviceManager->SetFolderSyncStatus($folderid, DeviceManager::FLD_SYNC_COMPLETED);
else
self::$deviceManager->SetFolderSyncStatus($folderid, DeviceManager::FLD_SYNC_INPROGRESS);
}
self::$encoder->endTag(); if($data !== false && $status == SYNC_STATUS_SUCCESS) {
self::$encoder->startTag(SYNC_DATA);
$data->Encode(self::$encoder);
self::$encoder->endTag();
}
else
ZLog::Write(LOGLEVEL_WARN, sprintf("Unable to Fetch '%s'", $id));
self::$encoder->endTag();
// Save the sync state for the next time }
if($spa->HasNewSyncKey()) { self::$encoder->endTag();
self::$topCollector->AnnounceInformation("Saving state"); }
try { if($sc->GetParameter($spa, "getchanges") && $spa->HasFolderId() && $spa->HasContentClass() && $spa->HasSyncKey()) {
if (isset($exporter) && $exporter) $windowSize = self::$deviceManager->GetWindowSize($spa->GetFolderId(), $spa->GetContentClass(), $spa->GetUuid(), $spa->GetUuidCounter(), $changecount);
$state = $exporter->GetState();
// nothing exported, but possibly imported - get the importer state // limit windowSize to the max available limit of the global window size left
else if ($sc->GetParameter($spa, "state") !== null) $globallyAvailable = $sc->GetGlobalWindowSize() - $this->globallyExportedItems;
$state = $sc->GetParameter($spa, "state"); if ($changecount > $globallyAvailable && $windowSize > $globallyAvailable) {
ZLog::Write(LOGLEVEL_DEBUG, sprintf("HandleSync(): Limit window size to %d as the global window size limit will be reached", $globallyAvailable));
$windowSize = $globallyAvailable;
}
// send <MoreAvailable/> if there are more changes than fit in the folder windowsize
if($changecount > $windowSize) {
self::$encoder->startTag(SYNC_MOREAVAILABLE, false, true);
}
}
// if a new request without state information (hierarchy) save an empty state // Stream outgoing changes
else if (! $spa->HasSyncKey()) if($status == SYNC_STATUS_SUCCESS && $sc->GetParameter($spa, "getchanges") == true && $windowSize > 0 && !!$exporter) {
$state = ""; self::$topCollector->AnnounceInformation(sprintf("Streaming data of %d objects", (($changecount > $windowSize)?$windowSize:$changecount)));
}
catch (StatusException $stex) {
$status = $stex->getCode();
}
// Output message changes per folder
self::$encoder->startTag(SYNC_PERFORM);
if (isset($state) && $status == SYNC_STATUS_SUCCESS) $n = 0;
self::$deviceManager->GetStateManager()->SetSyncState($spa->GetNewSyncKey(), $state, $spa->GetFolderId()); WBXMLDecoder::ResetInWhile("syncSynchronize");
else while(WBXMLDecoder::InWhile("syncSynchronize")) {
ZLog::Write(LOGLEVEL_ERROR, sprintf("HandleSync(): error saving '%s' - no state information available", $spa->GetNewSyncKey())); try {
$progress = $exporter->Synchronize();
if(!is_array($progress))
break;
$n++;
if ($n % 10 == 0)
self::$topCollector->AnnounceInformation(sprintf("Streamed data of %d objects out of %d", $n, (($changecount > $windowSize)?$windowSize:$changecount)));
}
catch (SyncObjectBrokenException $mbe) {
$brokenSO = $mbe->GetSyncObject();
if (!$brokenSO) {
ZLog::Write(LOGLEVEL_ERROR, sprintf("HandleSync(): Catched SyncObjectBrokenException but broken SyncObject not available. This should be fixed in the backend."));
}
else {
if (!isset($brokenSO->id)) {
$brokenSO->id = "Unknown ID";
ZLog::Write(LOGLEVEL_ERROR, sprintf("HandleSync(): Catched SyncObjectBrokenException but no ID of object set. This should be fixed in the backend."));
} }
self::$deviceManager->AnnounceIgnoredMessage($spa->GetFolderId(), $brokenSO->id, $brokenSO);
}
}
// something really bad happened while exporting changes
catch (StatusException $stex) {
$status = $stex->getCode();
// during export we found out that the states should be thrown away (ZP-623)
if ($status == SYNC_STATUS_INVALIDSYNCKEY) {
self::$deviceManager->ForceFolderResync($spa->GetFolderId());
break;
}
}
// save SyncParameters if($n >= $windowSize) {
if ($status == SYNC_STATUS_SUCCESS && empty($actiondata["fetchids"])) ZLog::Write(LOGLEVEL_DEBUG, sprintf("HandleSync(): Exported maxItems of messages: %d / %d", $n, $changecount));
$sc->SaveCollection($spa); break;
}
}
// reset status for the next folder // $progress is not an array when exporting the last message
$status = SYNC_STATUS_SUCCESS; // so we get the number to display from the streamimporter if it's available
if (!!$streamimporter) {
$n = $streamimporter->GetImportedMessages();
}
self::$encoder->endTag();
self::$topCollector->AnnounceInformation(sprintf("Outgoing %d objects%s", $n, ($n >= $windowSize)?" of ".$changecount:""), true);
$this->globallyExportedItems += $n;
} // END foreach collection // update folder status, if there is something set
} if ($spa->GetFolderSyncRemaining() && $changecount > 0) {
self::$encoder->endTag(); //SYNC_FOLDERS $spa->SetFolderSyncRemaining($changecount);
} }
// changecount is initialized with 'false', so 0 means no changes!
if ($changecount === 0 || ($changecount !== false && $changecount <= $windowSize))
self::$deviceManager->SetFolderSyncStatus($spa->GetFolderId(), DeviceManager::FLD_SYNC_COMPLETED);
else
self::$deviceManager->SetFolderSyncStatus($spa->GetFolderId(), DeviceManager::FLD_SYNC_INPROGRESS);
} }
self::$encoder->endTag(); //SYNC_SYNCHRONIZE
return true; self::$encoder->endTag();
// Save the sync state for the next time
if($spa->HasNewSyncKey()) {
self::$topCollector->AnnounceInformation("Saving state");
try {
if (isset($exporter) && $exporter)
$state = $exporter->GetState();
// nothing exported, but possibly imported - get the importer state
else if ($sc->GetParameter($spa, "state") !== null)
$state = $sc->GetParameter($spa, "state");
// if a new request without state information (hierarchy) save an empty state
else if (! $spa->HasSyncKey())
$state = "";
}
catch (StatusException $stex) {
$status = $stex->getCode();
}
if (isset($state) && $status == SYNC_STATUS_SUCCESS)
self::$deviceManager->GetStateManager()->SetSyncState($spa->GetNewSyncKey(), $state, $spa->GetFolderId());
else
ZLog::Write(LOGLEVEL_ERROR, sprintf("HandleSync(): error saving '%s' - no state information available", $spa->GetNewSyncKey()));
}
// save SyncParameters
if ($status == SYNC_STATUS_SUCCESS && empty($actiondata["fetchids"]))
$sc->SaveCollection($spa);
return $status;
} }
/** /**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment