merge: Accept Like(Note) and Update(Note) activities where the Note isn't already cached (resolves #795 and #748) (!729)
View MR for information: https://activitypub.software/TransFem-org/Sharkey/-/merge_requests/729 Closes #795 and #748 Approved-by: dakkar <dakkar@thenautilus.net> Approved-by: Marie <github@yuugi.dev>
This commit is contained in:
commit
3ae9f4e8e6
|
@ -165,7 +165,7 @@ export class ApInboxService {
|
||||||
} else if (isAnnounce(activity)) {
|
} else if (isAnnounce(activity)) {
|
||||||
return await this.announce(actor, activity, resolver);
|
return await this.announce(actor, activity, resolver);
|
||||||
} else if (isLike(activity)) {
|
} else if (isLike(activity)) {
|
||||||
return await this.like(actor, activity);
|
return await this.like(actor, activity, resolver);
|
||||||
} else if (isUndo(activity)) {
|
} else if (isUndo(activity)) {
|
||||||
return await this.undo(actor, activity, resolver);
|
return await this.undo(actor, activity, resolver);
|
||||||
} else if (isBlock(activity)) {
|
} else if (isBlock(activity)) {
|
||||||
|
@ -197,10 +197,13 @@ export class ApInboxService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
private async like(actor: MiRemoteUser, activity: ILike): Promise<string> {
|
private async like(actor: MiRemoteUser, activity: ILike, resolver?: Resolver): Promise<string> {
|
||||||
const targetUri = getApId(activity.object);
|
const targetUri = getApId(activity.object);
|
||||||
|
|
||||||
const note = await this.apNoteService.fetchNote(targetUri);
|
const object = fromTuple(activity.object);
|
||||||
|
if (!object) return 'skip: activity has no object property';
|
||||||
|
|
||||||
|
const note = await this.apNoteService.resolveNote(object, { resolver });
|
||||||
if (!note) return `skip: target note not found ${targetUri}`;
|
if (!note) return `skip: target note not found ${targetUri}`;
|
||||||
|
|
||||||
await this.apNoteService.extractEmojis(activity.tag ?? [], actor.host).catch(() => null);
|
await this.apNoteService.extractEmojis(activity.tag ?? [], actor.host).catch(() => null);
|
||||||
|
@ -385,7 +388,7 @@ export class ApInboxService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
private async create(actor: MiRemoteUser, activity: ICreate, resolver?: Resolver): Promise<string | void> {
|
private async create(actor: MiRemoteUser, activity: ICreate | IUpdate, resolver?: Resolver): Promise<string | void> {
|
||||||
const uri = getApId(activity);
|
const uri = getApId(activity);
|
||||||
|
|
||||||
this.logger.info(`Create: ${uri}`);
|
this.logger.info(`Create: ${uri}`);
|
||||||
|
@ -420,14 +423,14 @@ export class ApInboxService {
|
||||||
});
|
});
|
||||||
|
|
||||||
if (isPost(object)) {
|
if (isPost(object)) {
|
||||||
await this.createNote(resolver, actor, object, false, activity);
|
await this.createNote(resolver, actor, object, false);
|
||||||
} else {
|
} else {
|
||||||
return `Unknown type: ${getApType(object)}`;
|
return `Unknown type: ${getApType(object)}`;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
private async createNote(resolver: Resolver, actor: MiRemoteUser, note: IObject, silent = false, activity?: ICreate): Promise<string> {
|
private async createNote(resolver: Resolver, actor: MiRemoteUser, note: IObject, silent = false): Promise<string> {
|
||||||
const uri = getApId(note);
|
const uri = getApId(note);
|
||||||
|
|
||||||
if (typeof note === 'object') {
|
if (typeof note === 'object') {
|
||||||
|
@ -786,7 +789,7 @@ export class ApInboxService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
private async update(actor: MiRemoteUser, activity: IUpdate, resolver?: Resolver): Promise<string> {
|
private async update(actor: MiRemoteUser, activity: IUpdate, resolver?: Resolver): Promise<string | void> {
|
||||||
if (actor.uri !== activity.actor) {
|
if (actor.uri !== activity.actor) {
|
||||||
return 'skip: invalid actor';
|
return 'skip: invalid actor';
|
||||||
}
|
}
|
||||||
|
@ -805,9 +808,19 @@ export class ApInboxService {
|
||||||
await this.apPersonService.updatePerson(actor.uri, resolver, object);
|
await this.apPersonService.updatePerson(actor.uri, resolver, object);
|
||||||
return 'ok: Person updated';
|
return 'ok: Person updated';
|
||||||
} else if (getApType(object) === 'Question') {
|
} else if (getApType(object) === 'Question') {
|
||||||
|
// If we get an Update(Question) for a note that doesn't exist, then create it instead
|
||||||
|
if (!await this.apNoteService.hasNote(object)) {
|
||||||
|
return await this.create(actor, activity, resolver);
|
||||||
|
}
|
||||||
|
|
||||||
await this.apQuestionService.updateQuestion(object, actor, resolver).catch(err => console.error(err));
|
await this.apQuestionService.updateQuestion(object, actor, resolver).catch(err => console.error(err));
|
||||||
return 'ok: Question updated';
|
return 'ok: Question updated';
|
||||||
} else if (isPost(object)) {
|
} else if (isPost(object)) {
|
||||||
|
// If we get an Update(Note) for a note that doesn't exist, then create it instead
|
||||||
|
if (!await this.apNoteService.hasNote(object)) {
|
||||||
|
return await this.create(actor, activity, resolver);
|
||||||
|
}
|
||||||
|
|
||||||
await this.apNoteService.updateNote(object, actor, resolver).catch(err => console.error(err));
|
await this.apNoteService.updateNote(object, actor, resolver).catch(err => console.error(err));
|
||||||
return 'ok: Note updated';
|
return 'ok: Note updated';
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -142,6 +142,15 @@ export class ApNoteService {
|
||||||
return await this.apDbResolverService.getNoteFromApId(object);
|
return await this.apDbResolverService.getNoteFromApId(object);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if the provided object / ID exists in the local database.
|
||||||
|
*/
|
||||||
|
@bindThis
|
||||||
|
public async hasNote(object: string | IObject | [string | IObject]): Promise<boolean> {
|
||||||
|
const uri = getApId(object);
|
||||||
|
return await this.notesRepository.existsBy({ uri });
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Noteを作成します。
|
* Noteを作成します。
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -10,6 +10,7 @@ import type { Config } from '@/config.js';
|
||||||
import { DI } from '@/di-symbols.js';
|
import { DI } from '@/di-symbols.js';
|
||||||
import type Logger from '@/logger.js';
|
import type Logger from '@/logger.js';
|
||||||
import { bindThis } from '@/decorators.js';
|
import { bindThis } from '@/decorators.js';
|
||||||
|
import { StatusError } from '@/misc/status-error.js';
|
||||||
import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js';
|
import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js';
|
||||||
import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js';
|
import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js';
|
||||||
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
|
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
|
||||||
|
@ -132,7 +133,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
||||||
// 何故かeがundefinedで来ることがある
|
// 何故かeがundefinedで来ることがある
|
||||||
if (!e) return '?';
|
if (!e) return '?';
|
||||||
|
|
||||||
if (e instanceof Bull.UnrecoverableError || e.name === 'AbortError') {
|
if (e instanceof Bull.UnrecoverableError || e.name === 'AbortError' || e instanceof StatusError) {
|
||||||
return `${e.name}: ${e.message}`;
|
return `${e.name}: ${e.message}`;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,12 +147,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
||||||
function renderJob(job?: Bull.Job) {
|
function renderJob(job?: Bull.Job) {
|
||||||
if (!job) return '?';
|
if (!job) return '?';
|
||||||
|
|
||||||
return {
|
const info: Record<string, string> = {
|
||||||
name: job.name || undefined,
|
|
||||||
info: getJobInfo(job),
|
info: getJobInfo(job),
|
||||||
failedReason: job.failedReason || undefined,
|
|
||||||
data: job.data,
|
data: job.data,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if (job.name) info.name = job.name;
|
||||||
|
if (job.failedReason) info.failedReason = job.failedReason;
|
||||||
|
|
||||||
|
return info;
|
||||||
}
|
}
|
||||||
|
|
||||||
//#region system
|
//#region system
|
||||||
|
|
|
@ -7,6 +7,7 @@ import { URL } from 'node:url';
|
||||||
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
|
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
|
||||||
import httpSignature from '@peertube/http-signature';
|
import httpSignature from '@peertube/http-signature';
|
||||||
import * as Bull from 'bullmq';
|
import * as Bull from 'bullmq';
|
||||||
|
import { AbortError } from 'node-fetch';
|
||||||
import type Logger from '@/logger.js';
|
import type Logger from '@/logger.js';
|
||||||
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
|
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
|
||||||
import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
|
import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
|
||||||
|
@ -232,6 +233,19 @@ export class InboxProcessorService implements OnApplicationShutdown {
|
||||||
return e.message;
|
return e.message;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (e instanceof StatusError) {
|
||||||
|
if (e.isRetryable) {
|
||||||
|
return `temporary error ${e.statusCode}`;
|
||||||
|
} else {
|
||||||
|
return `skip: permanent error ${e.statusCode}`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (e instanceof AbortError) {
|
||||||
|
return 'request aborted';
|
||||||
|
}
|
||||||
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
return 'ok';
|
return 'ok';
|
||||||
|
|
Loading…
Reference in New Issue