// // IPC.m // Monal // // Created by Thilo Molitor on 31.07.20. // Copyright © 2020 Monal.im. All rights reserved. // #import #import #import #import "IPC.h" #import "MLSQLite.h" #import "HelperTools.h" #define MSG_TIMEOUT 2.0 @interface IPC() { NSString* _processName; NSString* _dbFile; NSMutableDictionary* _ipcQueues; NSCondition* _serverThreadCondition; } @property (readonly, strong) MLSQLite* db; @property (readonly, strong) NSThread* serverThread; -(void) incomingDarwinNotification:(NSString*) name; @end static volatile NSMutableDictionary* _responseHandlers; static volatile IPC* _sharedInstance; static volatile CFNotificationCenterRef _darwinNotificationCenterRef; //forward notifications to the IPC instance that is waiting (the instance running the server thread) void darwinNotificationCenterCallback(CFNotificationCenterRef center __unused, void* observer, CFNotificationName name, const void* object __unused, CFDictionaryRef userInfo __unused) { [(__bridge IPC*)observer incomingDarwinNotification:(__bridge NSString*)name]; } @implementation IPC +(void) initializeForProcess:(NSString*) processName { @synchronized(self) { MLAssert(_sharedInstance == nil, @"Please don't call [IPC initialize:@\"processName\" twice!"); _responseHandlers = [NSMutableDictionary new]; _darwinNotificationCenterRef = CFNotificationCenterGetDarwinNotifyCenter(); _sharedInstance = [[self alloc] initWithProcessName:processName]; //has to be last because it starts the thread which needs those global vars } } +(id) sharedInstance { @synchronized(self) { MLAssert(_sharedInstance!=nil, @"Please call [IPC initialize:@\"processName\"] first!"); return _sharedInstance; } } +(void) terminate { @synchronized(self) { //cancel server thread and wake it up to let it terminate properly if(_sharedInstance.serverThread) [_sharedInstance.serverThread cancel]; [_sharedInstance->_serverThreadCondition signal]; //deallocate everything _responseHandlers = nil; _sharedInstance = nil; } } -(void) sendMessage:(NSString*) name withData:(NSData* _Nullable) data to:(NSString*) destination { [self sendMessage:name withData:data to:destination withResponseHandler:nil]; } -(void) sendMessage:(NSString*) name withData:(NSData* _Nullable) data to:(NSString*) destination withResponseHandler:(IPC_response_handler_t _Nullable) responseHandler { NSNumber* id = [self writeIpcMessage:name withData:data andResponseId:[NSNumber numberWithInt:0] to:destination]; //save response handler for later execution (if one is specified) if(responseHandler) _responseHandlers[id] = responseHandler; } -(void) sendBroadcastMessage:(NSString*) name withData:(NSData* _Nullable) data { [self sendMessage:name withData:data to:@"*" withResponseHandler:nil]; } -(void) sendBroadcastMessage:(NSString*) name withData:(NSData* _Nullable) data withResponseHandler:(IPC_response_handler_t _Nullable) responseHandler { [self sendMessage:name withData:data to:@"*" withResponseHandler:responseHandler]; } -(void) respondToMessage:(NSDictionary*) message withData:(NSData* _Nullable) data { [self writeIpcMessage:message[@"name"] withData:data andResponseId:message[@"id"] to:message[@"source"]]; } -(NSString* _Nullable) exportDB { NSFileManager* fileManager = [NSFileManager defaultManager]; NSString* temporaryFilename = [NSString stringWithFormat:@"ipc_%@.db", [[NSProcessInfo processInfo] globallyUniqueString]]; NSString* temporaryFilePath = [NSTemporaryDirectory() stringByAppendingPathComponent:temporaryFilename]; //checkpoint db before copying db file [self.db checkpointWal]; //this transaction creates a new wal log and makes sure the file copy is atomic/consistent BOOL success = [self.db boolWriteTransaction:^{ //copy db file to temp file NSError* error; [fileManager copyItemAtPath:self->_dbFile toPath:temporaryFilePath error:&error]; if(error) { DDLogError(@"Could not copy database to export location!"); return NO; } return YES; }]; if(success) return temporaryFilePath; return nil; } -(id) initWithProcessName:(NSString*) processName { self = [super init]; _dbFile = [[HelperTools getContainerURLForPathComponents:@[@"ipc.sqlite"]] path]; _processName = processName; _ipcQueues = [NSMutableDictionary new]; _serverThreadCondition = [NSCondition new]; static dispatch_once_t once; static const int VERSION = 3; dispatch_once(&once, ^{ BOOL fileExists = [[NSFileManager defaultManager] fileExistsAtPath:_dbFile]; //first command creates initial database if file does not exist //this can not be used inside a transaction --> turn on WAL mode before executing any other db operations //this will create the database file and open the database because it is the first MLSQlite call done for this file //turning on WAL mode has to be done *outside* of any transactions [self.db enableWAL]; [self.db executeNonQuery:@"PRAGMA secure_delete=on;"]; //needed for sqlite >= 3.26.0 (see https://sqlite.org/lang_altertable.html point 2) [self.db executeNonQuery:@"PRAGMA legacy_alter_table=on;"]; [self.db executeNonQuery:@"PRAGMA foreign_keys=off;"]; NSNumber* version = [self.db idWriteTransaction:^{ if(!fileExists) { [self.db executeNonQuery:@"CREATE TABLE ipc(id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, name VARCHAR(255), source VARCHAR(255), destination VARCHAR(255), data BLOB, timeout INTEGER NOT NULL DEFAULT 0);"]; [self.db executeNonQuery:@"CREATE TABLE versions(name VARCHAR(255) NOT NULL PRIMARY KEY, version INTEGER NOT NULL);"]; [self.db executeNonQuery:@"INSERT INTO versions (name, version) VALUES('db', '1');"]; } //upgrade database version if needed NSNumber* version = (NSNumber*)[self.db executeScalar:@"SELECT version FROM versions WHERE name='db';"]; DDLogInfo(@"IPC db version: %@", version); if([version integerValue] < 2) [self.db executeNonQuery:@"ALTER TABLE ipc ADD COLUMN response_to INTEGER NOT NULL DEFAULT 0;"]; //do a vacuum and from now on do it on every db upgrade if([version integerValue] < 3) ; //any upgrade done --> update version table and delete all old ipc messages if([version integerValue] < VERSION) { //always truncate ipc table on version upgrade [self.db executeNonQuery:@"DELETE FROM ipc;"]; [self.db executeNonQuery:@"UPDATE versions SET version=? WHERE name='db';" andArguments:@[[NSNumber numberWithInt:VERSION]]]; DDLogInfo(@"IPC db upgraded to version: %d", VERSION); } return version; }]; if([version integerValue] < VERSION) [self.db vacuum]; //turn foreign keys on again //needed for sqlite >= 3.26.0 (see https://sqlite.org/lang_altertable.html point 2) [self.db executeNonQuery:@"PRAGMA legacy_alter_table=off;"]; [self.db executeNonQuery:@"PRAGMA foreign_keys=on;"]; }); //use a dedicated and very high priority thread to make sure this always runs _serverThread = [[NSThread alloc] initWithTarget:self selector:@selector(serverThreadMain) object:nil]; //_serverThread.threadPriority = 1.0; _serverThread.qualityOfService = NSQualityOfServiceUserInteractive; [_serverThread setName:@"IPCServerThread"]; [_serverThread start]; return self; } -(void) serverThreadMain { DDLogInfo(@"Now running IPC server for '%@' with thread priority %f...", _processName, [NSThread threadPriority]); //register darwin notification handler for "im.monal.ipc.wakeup:" which is used to wake up readNextMessage using the NSCondition CFNotificationCenterAddObserver(_darwinNotificationCenterRef, (__bridge void*) self, &darwinNotificationCenterCallback, (__bridge CFNotificationName)[NSString stringWithFormat:@"im.monal.ipc.wakeup:%@", _processName], NULL, 0); CFNotificationCenterAddObserver(_darwinNotificationCenterRef, (__bridge void*) self, &darwinNotificationCenterCallback, (__bridge CFNotificationName)@"im.monal.ipc.wakeup:*", NULL, 0); while(![[NSThread currentThread] isCancelled]) { NSDictionary* message = [self readNextMessage]; //this will be blocking if(!message) continue; DDLogDebug(@"Got IPC message: %@", message); //use a dedicated serial queue for every IPC receiver to maintain IPC message ordering while not blocking other receivers or this serverThread) NSArray* parts = [message[@"name"] componentsSeparatedByString:@"."]; NSString* queueName = [parts objectAtIndex:0]; if(!queueName || [parts count]<2) queueName = @"_default"; queueName = [NSString stringWithFormat:@"ipc.queue:%@", queueName]; if(!_ipcQueues[queueName]) _ipcQueues[queueName] = dispatch_queue_create([queueName cStringUsingEncoding:NSUTF8StringEncoding], dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_USER_INTERACTIVE, 0)); //handle all responses (don't trigger a kMonalIncomingIPC for responses) if(message[@"response_to"] && [message[@"response_to"] intValue] > 0) { //call response handler if one is present (ignore the spurious response otherwise) if(_responseHandlers[message[@"response_to"]]) { IPC_response_handler_t responseHandler = (IPC_response_handler_t)_responseHandlers[message[@"response_to"]]; if(responseHandler) { //responses handlers are only valid for the maximum RTT of messages (+ some safety margin) createTimer(MSG_TIMEOUT*2 + 1, (^{ [_responseHandlers removeObjectForKey:message[@"response_to"]]; })); dispatch_async(_ipcQueues[queueName], ^{ responseHandler(message); }); } } } else //publish all non-responses (using the message name as object allows for filtering by ipc message name) dispatch_async(_ipcQueues[queueName], ^{ [[NSNotificationCenter defaultCenter] postNotificationName:kMonalIncomingIPC object:message[@"name"] userInfo:message]; }); DDLogDebug(@"Handled IPC message: %@", message); } //unregister darwin notification handler CFNotificationCenterRemoveObserver(_darwinNotificationCenterRef, (__bridge void*) self, (__bridge CFNotificationName)[NSString stringWithFormat:@"im.monal.ipc.wakeup:%@", _processName], NULL); CFNotificationCenterRemoveObserver(_darwinNotificationCenterRef, (__bridge void*) self, (__bridge CFNotificationName)@"im.monal.ipc.wakeup:*", NULL); DDLogInfo(@"IPC server for '%@' now terminated", _processName); } -(void) incomingDarwinNotification:(NSString*) name { DDLogDebug(@"Got incoming darwin notification: %@", name); [_serverThreadCondition signal]; //wake up server thread to process new messages } -(NSDictionary*) readNextMessage { while(![[NSThread currentThread] isCancelled]) { NSDictionary* data = [self readIpcMessageFor:_processName]; if(data) return data; //wait for wakeup (incoming darwin notification or thread termination) DDLogVerbose(@"IPC readNextMessage waiting for wakeup via darwin notification"); [_serverThreadCondition wait]; } return nil; //thread cancelled } //this is the getter of our readonly "db" property always returning the thread-local instance of the MLSQLite class -(MLSQLite*) db { //always return thread-local instance of sqlite class (this is important for performance!) return [MLSQLite sharedInstanceForFile:_dbFile]; } -(NSDictionary*) readIpcMessageFor:(NSString*) destination { return [self.db idWriteTransaction:^{ NSDictionary* retval = nil; //delete old entries that timed out NSNumber* timestamp = [HelperTools currentTimestampInSeconds]; [self.db executeNonQuery:@"DELETE FROM ipc WHERE timeout_processName, destination, data, timeout, responseId]]; return [self.db lastInsertId]; }]; //send out darwin notification to wake up other processes waiting for IPC if(![destination isEqualToString:@"*"]) CFNotificationCenterPostNotification(CFNotificationCenterGetDarwinNotifyCenter(), (__bridge CFNotificationName)[NSString stringWithFormat:@"im.monal.ipc.wakeup:%@", destination], NULL, NULL, NO); else CFNotificationCenterPostNotification(CFNotificationCenterGetDarwinNotifyCenter(), (__bridge CFNotificationName)@"im.monal.ipc.wakeup:*", NULL, NULL, NO); DDLogDebug(@"Wrote IPC message %@ to database", id); return id; } @end